Skip to content

Commit

Permalink
Rotation fix and update. (#4516)
Browse files Browse the repository at this point in the history
* fixed memory

* error on existed block

* Fixed config to pass tests.

* Cleanup

* Cleanup

* Condition to pass tests.
  • Loading branch information
Frozen authored Oct 17, 2023
1 parent 9f5768a commit 1633656
Show file tree
Hide file tree
Showing 15 changed files with 505 additions and 356 deletions.
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,12 @@ travis_go_checker:

travis_rpc_checker:
bash ./scripts/travis_rpc_checker.sh

travis_rosetta_checker:
bash ./scripts/travis_rosetta_checker.sh

debug_external: clean
bash test/debug-external.sh

build_localnet_validator:
bash test/build-localnet-validator.sh
8 changes: 7 additions & 1 deletion consensus/consensus_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,12 @@ func (consensus *Consensus) getLogger() *zerolog.Logger {
func VerifyNewBlock(hooks *webhooks.Hooks, blockChain core.BlockChain, beaconChain core.BlockChain) func(*types.Block) error {
return func(newBlock *types.Block) error {
if err := blockChain.ValidateNewBlock(newBlock, beaconChain); err != nil {
switch {
case errors.Is(err, core.ErrKnownBlock):
return nil
default:
}

if hooks := hooks; hooks != nil {
if p := hooks.ProtocolIssues; p != nil {
url := p.OnCannotCommit
Expand All @@ -680,7 +686,7 @@ func VerifyNewBlock(hooks *webhooks.Hooks, blockChain core.BlockChain, beaconCha
Int("numStakingTx", len(newBlock.StakingTransactions())).
Err(err).
Msgf("[VerifyNewBlock] Cannot Verify New Block!!!, blockHeight %d, myHeight %d", newBlock.NumberU64(), blockChain.CurrentHeader().NumberU64())
return errors.Errorf(
return errors.WithMessagef(err,
"[VerifyNewBlock] Cannot Verify New Block!!! block-hash %s txn-count %d",
newBlock.Hash().Hex(),
len(newBlock.Transactions()),
Expand Down
17 changes: 17 additions & 0 deletions consensus/consensus_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (

"github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/registry"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"

msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/consensus/quorum"
Expand Down Expand Up @@ -73,3 +75,18 @@ func TestSetViewID(t *testing.T) {
t.Errorf("Cannot set consensus ID. Got: %v, Expected: %v", consensus.GetCurBlockViewID(), height)
}
}

func TestErrors(t *testing.T) {
e1 := errors.New("e1")
require.True(t, errors.Is(e1, e1))

t.Run("wrap", func(t *testing.T) {
e2 := errors.Wrap(e1, "e2")
require.True(t, errors.Is(e2, e1))
})

t.Run("withMessage", func(t *testing.T) {
e2 := errors.WithMessage(e1, "e2")
require.True(t, errors.Is(e2, e1))
})
}
36 changes: 17 additions & 19 deletions consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/common"
bls2 "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/consensus/signature"
"github.com/harmony-one/harmony/core"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/rs/zerolog"
Expand All @@ -31,7 +32,6 @@ import (
var (
errSenderPubKeyNotLeader = errors.New("sender pubkey doesn't match leader")
errVerifyMessageSignature = errors.New("verify message signature failed")
errParsingFBFTMessage = errors.New("failed parsing FBFT message")
)

// timeout constant
Expand Down Expand Up @@ -578,8 +578,13 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
}

if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.FBFTLog().IsBlockVerified(blk.Hash())); err != nil {
consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] Failed to add block to chain")
return
switch {
case errors.Is(err, core.ErrKnownBlock):
consensus.getLogger().Info().Msg("[preCommitAndPropose] Block already known")
default:
consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] Failed to add block to chain")
return
}
}

consensus.getLogger().Info().Msg("[preCommitAndPropose] Start consensus timer")
Expand Down Expand Up @@ -693,7 +698,7 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) {
prev = consensus.getLeaderPubKey()
leader = consensus.getLeaderPubKey()
)
utils.Logger().Info().Msgf("[Rotating leader] epoch: %v rotation:%v external rotation %v", epoch.Uint64(), bc.Config().IsLeaderRotation(epoch), bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch, consensus.ShardID))
utils.Logger().Info().Msgf("[Rotating leader] epoch: %v rotation:%v external rotation %v", epoch.Uint64(), bc.Config().IsLeaderRotationInternalValidators(epoch), bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch))
ss, err := bc.ReadShardState(epoch)
if err != nil {
utils.Logger().Error().Err(err).Msg("Failed to read shard state")
Expand Down Expand Up @@ -721,24 +726,17 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) {
// mine no less than 3 blocks in a row
numBlocksProducedByLeader = minimumBlocksForLeaderInRow
}
type stored struct {
pub []byte
epoch uint64
count uint64
shifts uint64 // count how much changes validator per epoch
}
var s stored
s.pub, s.epoch, s.count, s.shifts, _ = bc.LeaderRotationMeta()
if !bytes.Equal(leader.Bytes[:], s.pub) {
s := bc.LeaderRotationMeta()
if !bytes.Equal(leader.Bytes[:], s.Pub) {
// Another leader.
return
}
// if it is the first validator which produce blocks, then it should produce `rest` blocks too.
if s.shifts == 0 {
// If it is the first validator producing blocks, it should also produce the remaining 'rest' of the blocks.
if s.Shifts == 0 {
numBlocksProducedByLeader += rest
}
if s.count < numBlocksProducedByLeader {
// Not enough blocks produced by the leader.
if s.Count < numBlocksProducedByLeader {
// Not enough blocks produced by the leader, continue producing by the same leader.
return
}
// Passed all checks, we can change leader.
Expand All @@ -748,7 +746,7 @@ func (consensus *Consensus) rotateLeader(epoch *big.Int) {
wasFound bool
next *bls.PublicKeyWrapper
)
if bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch, consensus.ShardID) {
if bc.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) {
wasFound, next = consensus.Decider.NthNextValidator(committee.Slots, leader, 1)
} else {
wasFound, next = consensus.Decider.NthNextHmy(shard.Schedule.InstanceForEpoch(epoch), leader, 1)
Expand Down Expand Up @@ -778,7 +776,7 @@ func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg
} else {
epoch = blk.Epoch()
}
if consensus.Blockchain().Config().IsLeaderRotation(epoch) {
if consensus.Blockchain().Config().IsLeaderRotationInternalValidators(epoch) {
consensus.rotateLeader(epoch)
}

Expand Down
4 changes: 2 additions & 2 deletions consensus/view_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64, committee *shard.Com
// FIXME: rotate leader on harmony nodes only before fully externalization
var wasFound bool
var next *bls.PublicKeyWrapper
if blockchain != nil && blockchain.Config().IsLeaderRotation(epoch) {
if blockchain.Config().IsLeaderRotationExternalValidatorsAllowed(epoch, consensus.ShardID) {
if blockchain != nil && blockchain.Config().IsLeaderRotationInternalValidators(epoch) {
if blockchain.Config().IsLeaderRotationExternalValidatorsAllowed(epoch) {
wasFound, next = consensus.Decider.NthNextValidator(
committee.Slots,
lastLeaderPubKey,
Expand Down
4 changes: 2 additions & 2 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ type BlockChain interface {
//
// After insertion is done, all accumulated events will be fired.
InsertChain(chain types.Blocks, verifyHeaders bool) (int, error)
// LeaderRotationMeta returns the number of continuous blocks by the leader.
LeaderRotationMeta() (publicKeyBytes []byte, epoch, count, shifts uint64, err error)
// LeaderRotationMeta returns info about leader rotation.
LeaderRotationMeta() LeaderRotationMeta
// BadBlocks returns a list of the last 'bad blocks' that
// the client has seen on the network.
BadBlocks() []BadBlock
Expand Down
89 changes: 17 additions & 72 deletions core/blockchain_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ var (
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)

// ErrNoGenesis is the error when there is no genesis.
ErrNoGenesis = errors.New("Genesis not found in chain")
ErrNoGenesis = errors.New("Genesis not found in chain")
ErrEmptyChain = errors.New("empty chain")
// errExceedMaxPendingSlashes ..
errExceedMaxPendingSlashes = errors.New("exceeed max pending slashes")
errNilEpoch = errors.New("nil epoch for voting power computation")
Expand Down Expand Up @@ -222,7 +223,7 @@ type BlockChainImpl struct {
badBlocks *lru.Cache // Bad block cache
pendingSlashes slash.Records
maxGarbCollectedBlkNum int64
leaderRotationMeta leaderRotationMeta
leaderRotationMeta LeaderRotationMeta

options Options
}
Expand Down Expand Up @@ -1632,9 +1633,20 @@ func (bc *BlockChainImpl) InsertChain(chain types.Blocks, verifyHeaders bool) (i
return len(chain), nil
}

for _, b := range chain {
// check if blocks already exist
if bc.HasBlock(b.Hash(), b.NumberU64()) {
return 0, errors.Wrapf(ErrKnownBlock, "block %s %d already exists", b.Hash().Hex(), b.NumberU64())
}
}

prevHash := bc.CurrentBlock().Hash()
n, events, logs, err := bc.insertChain(chain, verifyHeaders)
bc.PostChainEvents(events, logs)
if err == nil {
if prevHash == bc.CurrentBlock().Hash() {
panic("insertChain failed to update current block")
}
// there should be only 1 block.
for _, b := range chain {
if b.Epoch().Uint64() > 0 {
Expand All @@ -1653,75 +1665,8 @@ func (bc *BlockChainImpl) InsertChain(chain types.Blocks, verifyHeaders bool) (i
return n, err
}

// buildLeaderRotationMeta builds leader rotation meta if feature is activated.
func (bc *BlockChainImpl) buildLeaderRotationMeta(curHeader *block.Header) error {
if !bc.chainConfig.IsLeaderRotation(curHeader.Epoch()) {
return nil
}
if curHeader.NumberU64() == 0 {
return errors.New("current header is genesis")
}
curPubKey, err := bc.getLeaderPubKeyFromCoinbase(curHeader)
if err != nil {
return err
}
for i := curHeader.NumberU64() - 1; i >= 0; i-- {
header := bc.GetHeaderByNumber(i)
if header == nil {
return errors.New("header is nil")
}
blockPubKey, err := bc.getLeaderPubKeyFromCoinbase(header)
if err != nil {
return err
}
if curPubKey.Bytes != blockPubKey.Bytes || curHeader.Epoch().Uint64() != header.Epoch().Uint64() {
for j := i; j <= curHeader.NumberU64(); j++ {
header := bc.GetHeaderByNumber(j)
if header == nil {
return errors.New("header is nil")
}
err := bc.saveLeaderRotationMeta(header)
if err != nil {
utils.Logger().Error().Err(err).Msg("save leader continuous blocks count error")
return err
}
}
return nil
}
}
return errors.New("no leader rotation meta to save")
}

func (bc *BlockChainImpl) saveLeaderRotationMeta(h *block.Header) error {
blockPubKey, err := bc.getLeaderPubKeyFromCoinbase(h)
if err != nil {
return err
}

var s = bc.leaderRotationMeta

// increase counter only if the same leader and epoch
if bytes.Equal(s.pub, blockPubKey.Bytes[:]) && s.epoch == h.Epoch().Uint64() {
s.count++
} else {
s.count = 1
}
// we should increase shifts if the leader is changed.
if !bytes.Equal(s.pub, blockPubKey.Bytes[:]) {
s.shifts++
}
// but set to zero if new
if s.epoch != h.Epoch().Uint64() {
s.shifts = 0
}
s.epoch = h.Epoch().Uint64()
bc.leaderRotationMeta = s

return nil
}

func (bc *BlockChainImpl) LeaderRotationMeta() (publicKeyBytes []byte, epoch, count, shifts uint64, err error) {
return rawdb.ReadLeaderRotationMeta(bc.db)
func (bc *BlockChainImpl) LeaderRotationMeta() LeaderRotationMeta {
return bc.leaderRotationMeta.Clone()
}

// insertChain will execute the actual chain insertion and event aggregation. The
Expand All @@ -1730,7 +1675,7 @@ func (bc *BlockChainImpl) LeaderRotationMeta() (publicKeyBytes []byte, epoch, co
func (bc *BlockChainImpl) insertChain(chain types.Blocks, verifyHeaders bool) (int, []interface{}, []*types.Log, error) {
// Sanity check that we have something meaningful to import
if len(chain) == 0 {
return 0, nil, nil, nil
return 0, nil, nil, ErrEmptyChain
}
// Do a sanity check that the provided chain is actually ordered and linked
for i := 1; i < len(chain); i++ {
Expand Down
Loading

0 comments on commit 1633656

Please sign in to comment.