Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove comments #1279

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion block/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

const CheckBalancesInterval = 3 * time.Minute

// MonitorBalances checks the balances of the node and updates the gauges for prometheus
func (m *Manager) MonitorBalances(ctx context.Context) error {
ticker := time.NewTicker(CheckBalancesInterval)
defer ticker.Stop()
Expand Down
63 changes: 6 additions & 57 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ import (
"github.com/dymensionxyz/dymint/types"
)

// applyBlockWithFraudHandling calls applyBlock and validateBlockBeforeApply with fraud handling.
func (m *Manager) applyBlockWithFraudHandling(block *types.Block, commit *types.Commit, blockMetaData types.BlockMetaData) error {
validateWithFraud := func() error {
if err := m.validateBlockBeforeApply(block, commit); err != nil {
m.blockCache.Delete(block.Header.Height)
// TODO: can we take an action here such as dropping the peer / reducing their reputation?

return fmt.Errorf("block not valid at height %d, dropping it: err:%w", block.Header.Height, err)
}

Expand All @@ -29,27 +28,15 @@ func (m *Manager) applyBlockWithFraudHandling(block *types.Block, commit *types.

err := validateWithFraud()
if errors.Is(err, gerrc.ErrFault) {
// Here we handle the fault by calling the fraud handler.
// FraudHandler is an interface that defines a method to handle faults. Implement this interface to handle faults
// in specific ways. For example, once a fault is detected, it publishes a DataHealthStatus event to the
// pubsub which sets the node in a frozen state.
m.FraudHandler.HandleFault(m.Ctx, err)
}

return err
}

// applyBlock applies the block to the store and the abci app.
// Contract: block and commit must be validated before calling this function!
// steps: save block -> execute block with app -> update state -> commit block to app -> update state's height and commit result.
// As the entire process can't be atomic we need to make sure the following condition apply before
// - block height is the expected block height on the store (height + 1).
// - block height is the expected block height on the app (last block height + 1).
func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMetaData types.BlockMetaData) error {
var retainHeight int64

// TODO: add switch case to have defined behavior for each case.
// validate block height
if block.Header.Height != m.State.NextHeight() {
return types.ErrInvalidBlockHeight
}
Expand All @@ -58,13 +45,11 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta

m.logger.Debug("Applying block", "height", block.Header.Height, "source", blockMetaData.Source.String())

// Check if the app's last block height is the same as the currently produced block height
isBlockAlreadyApplied, err := m.isHeightAlreadyApplied(block.Header.Height)
if err != nil {
return fmt.Errorf("check if block is already applied: %w", err)
}
// In case the following true, it means we crashed after the app commit but before updating the state
// In that case we'll want to align the state with the app commit result, as if the block was applied.

if isBlockAlreadyApplied {
err := m.UpdateStateFromApp(block.Header.Hash())
if err != nil {
Expand All @@ -73,7 +58,7 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
m.logger.Info("updated state from app commit", "height", block.Header.Height)
} else {
var appHash []byte
// Start applying the block assuming no inconsistency was found.

_, err = m.Store.SaveBlock(block, commit, nil)
if err != nil {
return fmt.Errorf("save block: %w", err)
Expand Down Expand Up @@ -104,71 +89,44 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
return fmt.Errorf("add drs version: %w", err)
}

// Commit block to app
appHash, retainHeight, err = m.Executor.Commit(m.State, block, responses)
if err != nil {
return fmt.Errorf("commit block: %w", err)
}

// Prune old heights, if requested by ABCI app.
// retainHeight is determined by currentHeight - min-retain-blocks (app.toml config).
// Unless max_age_num_blocks in consensus params is higher than min-retain-block, then max_age_num_blocks will be used instead of min-retain-blocks.

if 0 < retainHeight {
select {
case m.pruningC <- retainHeight:
default:
m.logger.Debug("pruning channel full. skipping pruning", "retainHeight", retainHeight)
}
}
// Update the state with the new app hash, and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.

m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height, block.Header.Hash())

}

// save last block time used to calculate batch skew time
m.LastBlockTime.Store(block.Header.GetTimestamp().UTC().UnixNano())
// Update the store:
// 1. Save the proposer for the current height to the store.
// 2. Update the proposer in the state in case of rotation.
// 3. Save the state to the store (independently of the height). Here the proposer might differ from (1).
// 4. Save the last block sequencer set to the store if it's present (only applicable in the sequencer mode).
// here, (3) helps properly handle reboots (specifically when there's rotation).
// If reboot happens after block H (which rotates seqA -> seqB):
// - Block H+1 will be signed by seqB.
// - The state must have seqB as proposer.

// Proposer cannot be empty while applying the block

proposer := m.State.GetProposer()
if proposer == nil {
return fmt.Errorf("logic error: got nil proposer while applying block")
}

batch := m.Store.NewBatch()

// 1. Save the proposer for the current height to the store.
// Proposer in the store is used for RPC queries.
batch, err = m.Store.SaveProposer(block.Header.Height, *proposer, batch)
if err != nil {
return fmt.Errorf("save proposer: %w", err)
}

// 2. Update the proposer in the state in case of rotation happened on the rollapp level (not necessarily on the hub yet).
isProposerUpdated := m.Executor.UpdateProposerFromBlock(m.State, m.Sequencers, block)

// 3. Save the state to the store (independently of the height). Here the proposer might differ from (1).
batch, err = m.Store.SaveState(m.State, batch)
if err != nil {
return fmt.Errorf("update state: %w", err)
}

// 4. Save the last block sequencer set to the store if it's present (only applicable in the sequencer mode).
// The set from the state is dumped to memory on reboots. It helps to avoid sending unnecessary
// UspertSequencer consensus messages on reboots. This is not a 100% solution, because the sequencer set
// is not persisted in the store in full node mode. It's only used in the proposer mode. Therefore,
// on rotation from the full node to the proposer, the sequencer set is duplicated as consensus msgs.
// Though single-time duplication it's not a big deal.
if len(blockMetaData.SequencerSet) != 0 {
batch, err = m.Store.SaveLastBlockSequencerSet(blockMetaData.SequencerSet, batch)
if err != nil {
Expand All @@ -185,33 +143,25 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta

m.blockCache.Delete(block.Header.Height)

// validate whether configuration params and rollapp consensus params keep in line, after rollapp params are updated from the responses received in the block execution
err = m.ValidateConfigWithRollappParams()
if err != nil {
return err
}

// Check if there was an Update for the proposer and if I am the new proposer.
// If so, restart so I can start as the proposer.
// For current proposer, we don't want to restart because we still need to send the last batch.
// This will be done as part of the `rotate` function.
if isProposerUpdated && m.AmIProposerOnRollapp() {
panic("I'm the new Proposer now. restarting as a proposer")
}

return nil
}

// isHeightAlreadyApplied checks if the block height is already applied to the app.
func (m *Manager) isHeightAlreadyApplied(blockHeight uint64) (bool, error) {
proxyAppInfo, err := m.Executor.GetAppInfo()
if err != nil {
return false, errorsmod.Wrap(err, "get app info")
}

isBlockAlreadyApplied := uint64(proxyAppInfo.LastBlockHeight) == blockHeight //nolint:gosec // LastBlockHeight is always positive

// TODO: add switch case to validate better the current app state
isBlockAlreadyApplied := uint64(proxyAppInfo.LastBlockHeight) == blockHeight

return isBlockAlreadyApplied, nil
}
Expand Down Expand Up @@ -240,7 +190,6 @@ func (m *Manager) attemptApplyCachedBlocks() error {
return nil
}

// This function validates the block and commit against the state before applying it.
func (m *Manager) validateBlockBeforeApply(block *types.Block, commit *types.Commit) error {
return types.ValidateProposedTransition(m.State, block, commit, m.State.GetProposerPubKey())
}
1 change: 0 additions & 1 deletion block/block_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
)

type Cache struct {
// concurrency managed by Manager.retrieverMu mutex
cache map[uint64]types.CachedBlock
}

Expand Down
1 change: 0 additions & 1 deletion block/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func ConsensusMsgSigner(m proto.Message) (sdk.AccAddress, error) {
}
}

// ConsensusMsgsOnSequencerSetUpdate forms a list of consensus messages to handle the sequencer set update.
func ConsensusMsgsOnSequencerSetUpdate(newSequencers []types.Sequencer) ([]proto.Message, error) {
msgs := make([]proto.Message, 0, len(newSequencers))
for _, s := range newSequencers {
Expand Down
30 changes: 7 additions & 23 deletions block/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
protoutils "github.com/dymensionxyz/dymint/utils/proto"
)

// default minimum block max size allowed. not specific reason to set it to 10K, but we need to avoid no transactions can be included in a block.
const minBlockMaxBytes = 10000

type ExecutorI interface {
Expand All @@ -33,15 +32,12 @@
UpdateStateAfterCommit(s *types.State, resp *tmstate.ABCIResponses, appHash []byte, height uint64, lastHeaderHash [32]byte)
UpdateProposerFromBlock(s *types.State, seqSet *types.SequencerSet, block *types.Block) bool

/* Consensus Messages */

AddConsensusMsgs(...proto2.Message)
GetConsensusMsgs() []proto2.Message
}

var _ ExecutorI = new(Executor)

// Executor creates and applies blocks and maintains state.
type Executor struct {
localAddress []byte
chainID string
Expand All @@ -55,8 +51,6 @@
logger types.Logger
}

// NewExecutor creates new instance of BlockExecutor.
// localAddress will be used in sequencer mode only.
func NewExecutor(
localAddress []byte,
chainID string,
Expand All @@ -79,23 +73,17 @@
return &be, nil
}

// AddConsensusMsgs adds new consensus msgs to the queue.
// The method is thread-safe.
func (e *Executor) AddConsensusMsgs(msgs ...proto2.Message) {
e.consensusMsgQueue.Add(msgs...)
}

// GetConsensusMsgs dequeues consensus msgs from the queue.
// The method is thread-safe.
func (e *Executor) GetConsensusMsgs() []proto2.Message {
return e.consensusMsgQueue.Get()
}

// InitChain calls InitChainSync using consensus connection to app.
func (e *Executor) InitChain(genesis *tmtypes.GenesisDoc, genesisChecksum string, valset []*tmtypes.Validator) (*abci.ResponseInitChain, error) {
valUpdates := abci.ValidatorUpdates{}

// prepare the validator updates as expected by the ABCI app
for _, validator := range valset {
tmkey, err := tmcrypto.PubKeyToProto(validator.PubKey)
if err != nil {
Expand Down Expand Up @@ -136,16 +124,15 @@
})
}

// CreateBlock reaps transactions from mempool and builds a block.
func (e *Executor) CreateBlock(
height uint64,
lastCommit *types.Commit,
lastHeaderHash, nextSeqHash [32]byte,
state *types.State,
maxBlockDataSizeBytes uint64,
) *types.Block {
maxBlockDataSizeBytes = min(maxBlockDataSizeBytes, uint64(max(minBlockMaxBytes, state.ConsensusParams.Block.MaxBytes))) //nolint:gosec // MaxBytes is always positive
mempoolTxs := e.mempool.ReapMaxBytesMaxGas(int64(maxBlockDataSizeBytes), state.ConsensusParams.Block.MaxGas) //nolint:gosec // size is always positive and falls in int64
maxBlockDataSizeBytes = min(maxBlockDataSizeBytes, uint64(max(minBlockMaxBytes, state.ConsensusParams.Block.MaxBytes)))
mempoolTxs := e.mempool.ReapMaxBytesMaxGas(int64(maxBlockDataSizeBytes), state.ConsensusParams.Block.MaxGas)

block := &types.Block{
Header: types.Header{
Expand Down Expand Up @@ -178,7 +165,6 @@
return block
}

// Commit commits the block
func (e *Executor) Commit(state *types.State, block *types.Block, resp *tmstate.ABCIResponses) ([]byte, int64, error) {
appHash, retainHeight, err := e.commit(state, block, resp.DeliverTxs)
if err != nil {
Expand All @@ -193,7 +179,6 @@
return appHash, retainHeight, nil
}

// GetAppInfo returns the latest AppInfo from the proxyApp.
func (e *Executor) GetAppInfo() (*abci.ResponseInfo, error) {
return e.proxyAppQueryConn.InfoSync(abci.RequestInfo{})
}
Expand All @@ -214,7 +199,7 @@

maxBytes := state.ConsensusParams.Block.MaxBytes
maxGas := state.ConsensusParams.Block.MaxGas
err = e.mempool.Update(int64(block.Header.Height), fromDymintTxs(block.Data.Txs), deliverTxs) //nolint:gosec // height is non-negative and falls in int64
err = e.mempool.Update(int64(block.Header.Height), fromDymintTxs(block.Data.Txs), deliverTxs)
if err != nil {
return nil, 0, err
}
Expand All @@ -224,7 +209,6 @@
return resp.Data, resp.RetainHeight, err
}

// ExecuteBlock executes the block and returns the ABCIResponses. Block should be valid (passed validation checks).
func (e *Executor) ExecuteBlock(block *types.Block) (*tmstate.ABCIResponses, error) {
abciResponses := new(tmstate.ABCIResponses)
abciResponses.DeliverTxs = make([]*abci.ResponseDeliverTx, len(block.Data.Txs))
Expand Down Expand Up @@ -273,7 +257,7 @@
}
}

abciResponses.EndBlock, err = e.proxyAppConsensusConn.EndBlockSync(abci.RequestEndBlock{Height: int64(block.Header.Height)}) //nolint:gosec // height is non-negative and falls in int64
abciResponses.EndBlock, err = e.proxyAppConsensusConn.EndBlockSync(abci.RequestEndBlock{Height: int64(block.Header.Height)})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -305,14 +289,14 @@
for _, ev := range abciBlock.Evidence.Evidence {
err = multierr.Append(err, e.eventBus.PublishEventNewEvidence(tmtypes.EventDataNewEvidence{
Evidence: ev,
Height: int64(block.Header.Height), //nolint:gosec // height is non-negative and falls in int64
Height: int64(block.Header.Height),
}))
}
for i, dtx := range resp.DeliverTxs {
err = multierr.Append(err, e.eventBus.PublishEventTx(tmtypes.EventDataTx{
TxResult: abci.TxResult{
Height: int64(block.Header.Height), //nolint:gosec // block height is within int64 range
Index: uint32(i), //nolint:gosec // num of deliver txs is less than 2^32
Height: int64(block.Header.Height),
Index: uint32(i),

Check failure on line 299 in block/executor.go

View workflow job for this annotation

GitHub Actions / golangci-lint

G115: integer overflow conversion int -> uint32 (gosec)
Tx: abciBlock.Data.Txs[i],
Result: *dtx,
},
Expand Down
Loading
Loading