diff --git a/api/service/blockproposal/service.go b/api/service/blockproposal/service.go index a1bc397d96..8330d6367e 100644 --- a/api/service/blockproposal/service.go +++ b/api/service/blockproposal/service.go @@ -10,14 +10,14 @@ import ( type Service struct { stopChan chan struct{} stoppedChan chan struct{} - c *consensus.Consensus + c *consensus.Proposer messageChan chan *msg_pb.Message } // New returns a block proposal service. -func New(c *consensus.Consensus) *Service { +func New(proposer *consensus.Proposer) *Service { return &Service{ - c: c, + c: proposer, stopChan: make(chan struct{}), stoppedChan: make(chan struct{}), } diff --git a/consensus/consensus_block_proposing.go b/consensus/consensus_block_proposing.go index ab0f25ae0f..80cc09fe29 100644 --- a/consensus/consensus_block_proposing.go +++ b/consensus/consensus_block_proposing.go @@ -9,9 +9,7 @@ import ( "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/rawdb" "github.com/harmony-one/harmony/core/types" - "github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/node/harmony/worker" "github.com/harmony-one/harmony/shard" staking "github.com/harmony-one/harmony/staking/types" "github.com/pkg/errors" @@ -408,88 +406,3 @@ func (consensus *Consensus) PendingCXReceipts() []*types.CXReceiptsProof { } return cxReceipts } - -// WaitForConsensusReadyV2 listen for the readiness signal from consensus and generate new block for consensus. -// only leader will receive the ready signal -func (consensus *Consensus) WaitForConsensusReadyV2(stopChan chan struct{}, stoppedChan chan struct{}) { - go func() { - // Setup stoppedChan - defer close(stoppedChan) - - utils.Logger().Debug(). - Msg("Waiting for Consensus ready") - select { - case <-time.After(30 * time.Second): - case <-stopChan: - return - } - - for { - // keep waiting for Consensus ready - select { - case <-stopChan: - utils.Logger().Warn(). - Msg("Consensus new block proposal: STOPPED!") - return - case proposal := <-consensus.GetReadySignal(): - for retryCount := 0; retryCount < 3 && consensus.IsLeader(); retryCount++ { - time.Sleep(SleepPeriod) - utils.Logger().Info(). - Uint64("blockNum", consensus.Blockchain().CurrentBlock().NumberU64()+1). - Bool("asyncProposal", proposal.Type == AsyncProposal). - Str("called", proposal.Caller). - Msg("PROPOSING NEW BLOCK ------------------------------------------------") - - // Prepare last commit signatures - newCommitSigsChan := make(chan []byte) - - go func() { - waitTime := 0 * time.Second - if proposal.Type == AsyncProposal { - waitTime = worker.CommitSigReceiverTimeout - } - select { - case <-time.After(waitTime): - if waitTime == 0 { - utils.Logger().Info().Msg("[ProposeNewBlock] Sync block proposal, reading commit sigs directly from DB") - } else { - utils.Logger().Info().Msg("[ProposeNewBlock] Timeout waiting for commit sigs, reading directly from DB") - } - sigs, err := consensus.BlockCommitSigs(consensus.Blockchain().CurrentBlock().NumberU64()) - - if err != nil { - utils.Logger().Error().Err(err).Msg("[ProposeNewBlock] Cannot get commit signatures from last block") - } else { - newCommitSigsChan <- sigs - } - case commitSigs := <-consensus.GetCommitSigChannel(): - utils.Logger().Info().Msg("[ProposeNewBlock] received commit sigs asynchronously") - if len(commitSigs) > bls.BLSSignatureSizeInBytes { - newCommitSigsChan <- commitSigs - } - } - }() - newBlock, err := consensus.ProposeNewBlock(newCommitSigsChan) - if err == nil { - utils.Logger().Info(). - Uint64("blockNum", newBlock.NumberU64()). - Uint64("epoch", newBlock.Epoch().Uint64()). - Uint64("viewID", newBlock.Header().ViewID().Uint64()). - Int("numTxs", newBlock.Transactions().Len()). - Int("numStakingTxs", newBlock.StakingTransactions().Len()). - Int("crossShardReceipts", newBlock.IncomingReceipts().Len()). - Msgf("=========Successfully Proposed New Block, shard: %d epoch: %d number: %d ==========", newBlock.ShardID(), newBlock.Epoch().Uint64(), newBlock.NumberU64()) - - // Send the new block to Consensus so it can be confirmed. - consensus.BlockChannel(newBlock) - break - } else { - utils.Logger().Err(err).Int("retryCount", retryCount). - Msg("!!!!!!!!!Failed Proposing New Block!!!!!!!!!") - continue - } - } - } - } - }() -} diff --git a/consensus/proposer.go b/consensus/proposer.go new file mode 100644 index 0000000000..6f60887ecf --- /dev/null +++ b/consensus/proposer.go @@ -0,0 +1,103 @@ +package consensus + +import ( + "time" + + "github.com/harmony-one/harmony/crypto/bls" + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/node/harmony/worker" +) + +type Proposer struct { + consensus *Consensus +} + +func NewProposer(consensus *Consensus) *Proposer { + return &Proposer{consensus} +} + +// WaitForConsensusReadyV2 listen for the readiness signal from consensus and generate new block for consensus. +// only leader will receive the ready signal +func (p *Proposer) WaitForConsensusReadyV2(stopChan chan struct{}, stoppedChan chan struct{}) { + consensus := p.consensus + go func() { + // Setup stoppedChan + defer close(stoppedChan) + + utils.Logger().Debug(). + Msg("Waiting for Consensus ready") + select { + case <-time.After(30 * time.Second): + case <-stopChan: + return + } + + for { + // keep waiting for Consensus ready + select { + case <-stopChan: + utils.Logger().Warn(). + Msg("Consensus new block proposal: STOPPED!") + return + case proposal := <-consensus.GetReadySignal(): + for retryCount := 0; retryCount < 3 && consensus.IsLeader(); retryCount++ { + time.Sleep(SleepPeriod) + utils.Logger().Info(). + Uint64("blockNum", consensus.Blockchain().CurrentBlock().NumberU64()+1). + Bool("asyncProposal", proposal.Type == AsyncProposal). + Str("called", proposal.Caller). + Msg("PROPOSING NEW BLOCK ------------------------------------------------") + + // Prepare last commit signatures + newCommitSigsChan := make(chan []byte) + + go func() { + waitTime := 0 * time.Second + if proposal.Type == AsyncProposal { + waitTime = worker.CommitSigReceiverTimeout + } + select { + case <-time.After(waitTime): + if waitTime == 0 { + utils.Logger().Info().Msg("[ProposeNewBlock] Sync block proposal, reading commit sigs directly from DB") + } else { + utils.Logger().Info().Msg("[ProposeNewBlock] Timeout waiting for commit sigs, reading directly from DB") + } + sigs, err := consensus.BlockCommitSigs(consensus.Blockchain().CurrentBlock().NumberU64()) + + if err != nil { + utils.Logger().Error().Err(err).Msg("[ProposeNewBlock] Cannot get commit signatures from last block") + } else { + newCommitSigsChan <- sigs + } + case commitSigs := <-consensus.GetCommitSigChannel(): + utils.Logger().Info().Msg("[ProposeNewBlock] received commit sigs asynchronously") + if len(commitSigs) > bls.BLSSignatureSizeInBytes { + newCommitSigsChan <- commitSigs + } + } + }() + newBlock, err := consensus.ProposeNewBlock(newCommitSigsChan) + if err == nil { + utils.Logger().Info(). + Uint64("blockNum", newBlock.NumberU64()). + Uint64("epoch", newBlock.Epoch().Uint64()). + Uint64("viewID", newBlock.Header().ViewID().Uint64()). + Int("numTxs", newBlock.Transactions().Len()). + Int("numStakingTxs", newBlock.StakingTransactions().Len()). + Int("crossShardReceipts", newBlock.IncomingReceipts().Len()). + Msgf("=========Successfully Proposed New Block, shard: %d epoch: %d number: %d ==========", newBlock.ShardID(), newBlock.Epoch().Uint64(), newBlock.NumberU64()) + + // Send the new block to Consensus so it can be confirmed. + consensus.BlockChannel(newBlock) + break + } else { + utils.Logger().Err(err).Int("retryCount", retryCount). + Msg("!!!!!!!!!Failed Proposing New Block!!!!!!!!!") + continue + } + } + } + } + }() +} diff --git a/node/harmony/service_setup.go b/node/harmony/service_setup.go index 223e7341dd..e5b27c7dbc 100644 --- a/node/harmony/service_setup.go +++ b/node/harmony/service_setup.go @@ -5,8 +5,9 @@ import ( "github.com/harmony-one/harmony/api/service" "github.com/harmony-one/harmony/api/service/blockproposal" - "github.com/harmony-one/harmony/api/service/consensus" + srv "github.com/harmony-one/harmony/api/service/consensus" "github.com/harmony-one/harmony/api/service/explorer" + "github.com/harmony-one/harmony/consensus" ) // RegisterValidatorServices register the validator services. @@ -14,12 +15,12 @@ func (node *Node) RegisterValidatorServices() { // Register consensus service. node.serviceManager.Register( service.Consensus, - consensus.New(node.Consensus), + srv.New(node.Consensus), ) // Register new block service. node.serviceManager.Register( service.BlockProposal, - blockproposal.New(node.Consensus), + blockproposal.New(consensus.NewProposer(node.Consensus)), ) }