Skip to content

Commit

Permalink
feat: block production misbehavior detection (#1071)
Browse files Browse the repository at this point in the history
Co-authored-by: Faulty Tolly <@faulttolerance.net>
  • Loading branch information
faultytolly authored Oct 22, 2024
1 parent 57bc89e commit 9aef1d0
Show file tree
Hide file tree
Showing 28 changed files with 2,005 additions and 119 deletions.
8 changes: 4 additions & 4 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ packages:
github.com/dymensionxyz/dymint/p2p:
interfaces:
GetProposerI:




github.com/dymensionxyz/dymint/block:
interfaces:
ExecutorI:
FraudHandler:
49 changes: 40 additions & 9 deletions block/block.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,50 @@
package block

import (
"context"
"errors"
"fmt"

"github.com/dymensionxyz/gerr-cosmos/gerrc"

errorsmod "cosmossdk.io/errors"

"github.com/dymensionxyz/dymint/types"
)

// applyBlockWithFraudHandling calls applyBlock and validateBlockBeforeApply with fraud handling.
func (m *Manager) applyBlockWithFraudHandling(block *types.Block, commit *types.Commit, blockMetaData types.BlockMetaData) error {
validateWithFraud := func() error {
if err := m.validateBlockBeforeApply(block, commit); err != nil {
if err != nil {
m.blockCache.Delete(block.Header.Height)
// TODO: can we take an action here such as dropping the peer / reducing their reputation?

return fmt.Errorf("block not valid at height %d, dropping it: err:%w", block.Header.Height, err)
}
}

if err := m.applyBlock(block, commit, blockMetaData); err != nil {
return fmt.Errorf("apply block: %w", err)
}

return nil
}

err := validateWithFraud()
if errors.Is(err, gerrc.ErrFault) {
// Here we handle the fault by calling the fraud handler.
// FraudHandler is an interface that defines a method to handle faults. Implement this interface to handle faults
// in specific ways. For example, once a fault is detected, it publishes a DataHealthStatus event to the
// pubsub which sets the node in a frozen state.
m.FraudHandler.HandleFault(context.Background(), err)

return err
}

return nil
}

// applyBlock applies the block to the store and the abci app.
// Contract: block and commit must be validated before calling this function!
// steps: save block -> execute block with app -> update state -> commit block to app -> update state's height and commit result.
Expand Down Expand Up @@ -35,7 +72,7 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
// In case the following true, it means we crashed after the app commit but before updating the state
// In that case we'll want to align the state with the app commit result, as if the block was applied.
if isBlockAlreadyApplied {
err := m.UpdateStateFromApp()
err := m.UpdateStateFromApp(block.Header.Hash())
if err != nil {
return fmt.Errorf("update state from app: %w", err)
}
Expand Down Expand Up @@ -83,7 +120,7 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta

// Update the state with the new app hash, and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height)
m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height, block.Header.Hash())
}

// check if the proposer needs to be changed
Expand Down Expand Up @@ -150,18 +187,12 @@ func (m *Manager) attemptApplyCachedBlocks() error {
if !blockExists {
break
}
if err := m.validateBlockBeforeApply(cachedBlock.Block, cachedBlock.Commit); err != nil {
m.blockCache.Delete(cachedBlock.Block.Header.Height)
// TODO: can we take an action here such as dropping the peer / reducing their reputation?
return fmt.Errorf("block not valid at height %d, dropping it: err:%w", cachedBlock.Block.Header.Height, err)
}

err := m.applyBlock(cachedBlock.Block, cachedBlock.Commit, types.BlockMetaData{Source: cachedBlock.Source})
err := m.applyBlockWithFraudHandling(cachedBlock.Block, cachedBlock.Commit, types.BlockMetaData{Source: cachedBlock.Source})
if err != nil {
return fmt.Errorf("apply cached block: expected height: %d: %w", expectedHeight, err)
}
m.logger.Info("Block applied", "height", expectedHeight)

}

return nil
Expand Down
14 changes: 13 additions & 1 deletion block/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ import (
// default minimum block max size allowed. not specific reason to set it to 10K, but we need to avoid no transactions can be included in a block.
const minBlockMaxBytes = 10000

type ExecutorI interface {
InitChain(genesis *tmtypes.GenesisDoc, valset []*tmtypes.Validator) (*abci.ResponseInitChain, error)
CreateBlock(height uint64, lastCommit *types.Commit, lastHeaderHash, nextSeqHash [32]byte, state *types.State, maxBlockDataSizeBytes uint64) *types.Block
Commit(state *types.State, block *types.Block, resp *tmstate.ABCIResponses) ([]byte, int64, error)
GetAppInfo() (*abci.ResponseInfo, error)
ExecuteBlock(state *types.State, block *types.Block) (*tmstate.ABCIResponses, error)
UpdateStateAfterInitChain(s *types.State, res *abci.ResponseInitChain)
UpdateMempoolAfterInitChain(s *types.State)
UpdateStateAfterCommit(s *types.State, resp *tmstate.ABCIResponses, appHash []byte, height uint64, lastHeaderHash [32]byte)
UpdateProposerFromBlock(s *types.State, block *types.Block) bool
}

// Executor creates and applies blocks and maintains state.
type Executor struct {
localAddress []byte
Expand All @@ -46,7 +58,7 @@ func NewExecutor(
eventBus *tmtypes.EventBus,
consensusMessagesStream ConsensusMessagesStream,
logger types.Logger,
) (*Executor, error) {
) (ExecutorI, error) {
be := Executor{
localAddress: localAddress,
chainID: chainID,
Expand Down
26 changes: 14 additions & 12 deletions block/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ func TestApplyBlock(t *testing.T) {

logger := log.TestingLogger()

// Create a valid proposer for the block
proposerKey := ed25519.GenPrivKey()
tmPubKey, err := cryptocodec.ToTmPubKeyInterface(proposerKey.PubKey())
require.NoError(err)

// Mock ABCI app
app := &tmmocks.MockApplication{}
app.On("CheckTx", mock.Anything).Return(abci.ResponseCheckTx{})
Expand All @@ -201,7 +206,7 @@ func TestApplyBlock(t *testing.T) {
},
})
var mockAppHash [32]byte
_, err := rand.Read(mockAppHash[:])
_, err = rand.Read(mockAppHash[:])
require.NoError(err)
app.On("Commit", mock.Anything).Return(abci.ResponseCommit{
Data: mockAppHash[:],
Expand Down Expand Up @@ -229,7 +234,7 @@ func TestApplyBlock(t *testing.T) {
appConns := &tmmocksproxy.MockAppConns{}
appConns.On("Consensus").Return(abciClient)
appConns.On("Query").Return(abciClient)
executor, err := block.NewExecutor([]byte("test address"), chainID, mpool, appConns, eventBus, nil, logger)
executor, err := block.NewExecutor(proposerKey.PubKey().Address(), chainID, mpool, appConns, eventBus, nil, logger)
assert.NoError(err)

// Subscribe to tx events
Expand All @@ -246,25 +251,22 @@ func TestApplyBlock(t *testing.T) {
require.NoError(err)
require.NotNil(headerSub)

// Create a valid proposer for the block
proposerKey := ed25519.GenPrivKey()
tmPubKey, err := cryptocodec.ToTmPubKeyInterface(proposerKey.PubKey())
require.NoError(err)

// Init state
state := &types.State{}
state.Sequencers.SetProposer(types.NewSequencerFromValidator(*tmtypes.NewValidator(tmPubKey, 1)))
state.InitialHeight = 1
state.ChainID = chainID
state.SetHeight(0)
maxBytes := uint64(10000)
state.ConsensusParams.Block.MaxBytes = int64(maxBytes)
state.ConsensusParams.Block.MaxGas = 100000
state.RollappParams.Da = "mock"
state.LastHeaderHash = [32]byte{0x01}

// Create first block with one Tx from mempool
_ = mpool.CheckTx([]byte{1, 2, 3, 4}, func(r *abci.Response) {}, mempool.TxInfo{})
require.NoError(err)
block := executor.CreateBlock(1, &types.Commit{Height: 0}, [32]byte{}, [32]byte(state.Sequencers.ProposerHash()), state, maxBytes)
block := executor.CreateBlock(1, &types.Commit{Height: 0}, [32]byte{0x01}, [32]byte(state.Sequencers.ProposerHash()), state, maxBytes)
require.NotNil(block)
assert.Equal(uint64(1), block.Header.Height)
assert.Len(block.Data.Txs, 1)
Expand All @@ -290,7 +292,7 @@ func TestApplyBlock(t *testing.T) {
require.NotNil(resp)
appHash, _, err := executor.Commit(state, block, resp)
require.NoError(err)
executor.UpdateStateAfterCommit(state, resp, appHash, block.Header.Height)
executor.UpdateStateAfterCommit(state, resp, appHash, block.Header.Height, block.Header.Hash())
assert.Equal(uint64(1), state.Height())
assert.Equal(mockAppHash, state.AppHash)

Expand All @@ -299,7 +301,7 @@ func TestApplyBlock(t *testing.T) {
require.NoError(mpool.CheckTx([]byte{5, 6, 7, 8, 9}, func(r *abci.Response) {}, mempool.TxInfo{}))
require.NoError(mpool.CheckTx([]byte{1, 2, 3, 4, 5}, func(r *abci.Response) {}, mempool.TxInfo{}))
require.NoError(mpool.CheckTx(make([]byte, 9990), func(r *abci.Response) {}, mempool.TxInfo{}))
block = executor.CreateBlock(2, commit, [32]byte{}, [32]byte(state.Sequencers.ProposerHash()), state, maxBytes)
block = executor.CreateBlock(2, commit, block.Header.Hash(), [32]byte(state.Sequencers.ProposerHash()), state, maxBytes)
require.NotNil(block)
assert.Equal(uint64(2), block.Header.Height)
assert.Len(block.Data.Txs, 3)
Expand All @@ -321,7 +323,7 @@ func TestApplyBlock(t *testing.T) {

// Apply the block with an invalid commit
err = types.ValidateProposedTransition(state, block, invalidCommit, state.Sequencers.GetProposerPubKey())
require.ErrorIs(err, types.ErrInvalidSignature)
require.ErrorContains(err, types.ErrInvalidSignature.Error())

// Create a valid commit for the block
signature, err = proposerKey.Sign(abciHeaderBytes)
Expand All @@ -340,7 +342,7 @@ func TestApplyBlock(t *testing.T) {
require.NotNil(resp)
_, _, err = executor.Commit(state, block, resp)
require.NoError(err)
executor.UpdateStateAfterCommit(state, resp, appHash, block.Header.Height)
executor.UpdateStateAfterCommit(state, resp, appHash, block.Header.Height, block.Header.Hash())
assert.Equal(uint64(2), state.Height())

// check rollapp params update
Expand Down
32 changes: 32 additions & 0 deletions block/fraud.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package block

import (
"context"

"github.com/dymensionxyz/dymint/node/events"
uevent "github.com/dymensionxyz/dymint/utils/event"
)

// FraudHandler is an interface that defines a method to handle faults.
// Contract: should not be blocking.
type FraudHandler interface {
// HandleFault handles a fault that occurred in the system.
// The fault is passed as an error type.
HandleFault(ctx context.Context, fault error)
}

// FreezeHandler is used to handle faults coming from executing and validating blocks.
// once a fault is detected, it publishes a DataHealthStatus event to the pubsub which sets the node in a frozen state.
type FreezeHandler struct {
m *Manager
}

func (f FreezeHandler) HandleFault(ctx context.Context, fault error) {
uevent.MustPublish(ctx, f.m.Pubsub, &events.DataHealthStatus{Error: fault}, events.HealthStatusList)
}

func NewFreezeHandler(manager *Manager) *FreezeHandler {
return &FreezeHandler{
m: manager,
}
}
35 changes: 35 additions & 0 deletions block/fraud_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package block_test

import (
"errors"
"testing"

"github.com/dymensionxyz/gerr-cosmos/gerrc"
)

type mockError struct {
name string
data string
}

func (m mockError) Error() string {
return "some string"
}

func (mockError) Unwrap() error {
return gerrc.ErrFault
}

func TestErrorIsErrFault(t *testing.T) {
err := mockError{name: "test", data: "test"}

if !errors.Is(err, gerrc.ErrFault) {
t.Error("Expected Is to return true")
}

anotherErr := errors.New("some error")

if errors.Is(anotherErr, gerrc.ErrFault) {
t.Error("Expected Is to return false")
}
}
21 changes: 14 additions & 7 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@ import (
tmcrypto "github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/pubsub"
tmtypes "github.com/tendermint/tendermint/types"

"github.com/dymensionxyz/dymint/p2p"

"github.com/tendermint/tendermint/proxy"
tmtypes "github.com/tendermint/tendermint/types"

"github.com/dymensionxyz/dymint/config"
"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/mempool"
"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/types"
)
Expand All @@ -46,7 +44,7 @@ type Manager struct {
// Store and execution
Store store.Store
State *types.State
Executor *Executor
Executor ExecutorI

// Clients and servers
Pubsub *pubsub.Server
Expand Down Expand Up @@ -79,6 +77,9 @@ type Manager struct {
// TargetHeight holds the value of the current highest block seen from either p2p (probably higher) or the DA
TargetHeight atomic.Uint64

// Fraud handler
FraudHandler FraudHandler

// channel used to send the retain height to the pruning background loop
pruningC chan int64

Expand Down Expand Up @@ -135,6 +136,7 @@ func NewManager(
},
pruningC: make(chan int64, 10), // use of buffered channel to avoid blocking applyBlock thread. In case channel is full, pruning will be skipped, but the retain height can be pruned in the next iteration.
}
m.setFraudHandler(NewFreezeHandler(m))

err = m.LoadStateOnInit(store, genesis, logger)
if err != nil {
Expand Down Expand Up @@ -197,8 +199,8 @@ func (m *Manager) Start(ctx context.Context) error {
}()

// P2P Sync. Subscribe to P2P received blocks events
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.onReceivedBlock, m.logger)
go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.onReceivedBlock, m.logger)
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewGossipedBlock, m.OnReceivedBlock, m.logger)
go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockSyncBlocksLoop", p2p.EventQueryNewBlockSyncBlock, m.OnReceivedBlock, m.logger)
return nil
}

Expand Down Expand Up @@ -352,3 +354,8 @@ func (m *Manager) setDA(daconfig string, dalcKV store.KV, logger log.Logger) err
m.Retriever = retriever
return nil
}

// setFraudHandler sets the fraud handler for the block manager.
func (m *Manager) setFraudHandler(handler *FreezeHandler) {
m.FraudHandler = handler
}
Loading

0 comments on commit 9aef1d0

Please sign in to comment.