Skip to content

Commit

Permalink
Merge pull request #6526 from onflow/yurii/6517-proposal-refactoring
Browse files Browse the repository at this point in the history
[BFT] `model.Proposal` refactoring
  • Loading branch information
AlexHentschel authored Oct 26, 2024
2 parents c1a1cc0 + 78dd10d commit be15d03
Show file tree
Hide file tree
Showing 81 changed files with 384 additions and 395 deletions.
4 changes: 2 additions & 2 deletions consensus/hotstuff/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type VoteAggregationViolationConsumer interface {
// Prerequisites:
// Implementation must be concurrency safe; Non-blocking;
// and must handle repetition of the same events (with some processing overhead).
OnVoteForInvalidBlockDetected(vote *model.Vote, invalidProposal *model.Proposal)
OnVoteForInvalidBlockDetected(vote *model.Vote, invalidProposal *model.SignedProposal)
}

// TimeoutAggregationViolationConsumer consumes outbound notifications about Active Pacemaker violations specifically
Expand Down Expand Up @@ -138,7 +138,7 @@ type ParticipantConsumer interface {
// Prerequisites:
// Implementation must be concurrency safe; Non-blocking;
// and must handle repetition of the same events (with some processing overhead).
OnReceiveProposal(currentView uint64, proposal *model.Proposal)
OnReceiveProposal(currentView uint64, proposal *model.SignedProposal)

// OnReceiveQc notifications are produced by the EventHandler when it starts processing a
// QuorumCertificate [QC] constructed by the node's internal vote aggregator.
Expand Down
2 changes: 1 addition & 1 deletion consensus/hotstuff/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type EventHandler interface {
// consensus participant.
// All inputs should be validated before feeding into this function. Assuming trusted data.
// No errors are expected during normal operation.
OnReceiveProposal(proposal *model.Proposal) error
OnReceiveProposal(proposal *model.SignedProposal) error

// OnLocalTimeout handles a local timeout event by creating a model.TimeoutObject and broadcasting it.
// No errors are expected during normal operation.
Expand Down
8 changes: 4 additions & 4 deletions consensus/hotstuff/eventhandler/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (e *EventHandler) OnReceiveTc(tc *flow.TimeoutCertificate) error {
// consensus participant.
// All inputs should be validated before feeding into this function. Assuming trusted data.
// No errors are expected during normal operation.
func (e *EventHandler) OnReceiveProposal(proposal *model.Proposal) error {
func (e *EventHandler) OnReceiveProposal(proposal *model.SignedProposal) error {
block := proposal.Block
curView := e.paceMaker.CurView()
log := e.log.With().
Expand Down Expand Up @@ -429,7 +429,7 @@ func (e *EventHandler) proposeForNewViewIfPrimary() error {
lastViewTC = nil
}

// Construct Own Proposal
// Construct Own SignedProposal
// CAUTION, design constraints:
// (i) We cannot process our own proposal within the `EventHandler` right away.
// (ii) We cannot add our own proposal to Forks here right away.
Expand Down Expand Up @@ -491,7 +491,7 @@ func (e *EventHandler) proposeForNewViewIfPrimary() error {
// It is called AFTER the block has been stored or found in Forks
// It checks whether to vote for this block.
// No errors are expected during normal operation.
func (e *EventHandler) processBlockForCurrentView(proposal *model.Proposal) error {
func (e *EventHandler) processBlockForCurrentView(proposal *model.SignedProposal) error {
// sanity check that block is really for the current view:
curView := e.paceMaker.CurView()
block := proposal.Block
Expand Down Expand Up @@ -526,7 +526,7 @@ func (e *EventHandler) processBlockForCurrentView(proposal *model.Proposal) erro
// ownVote generates and forwards the own vote, if we decide to vote.
// Any errors are potential symptoms of uncovered edge cases or corrupted internal state (fatal).
// No errors are expected during normal operation.
func (e *EventHandler) ownVote(proposal *model.Proposal, curView uint64, nextLeader flow.Identifier) error {
func (e *EventHandler) ownVote(proposal *model.SignedProposal, curView uint64, nextLeader flow.Identifier) error {
block := proposal.Block
log := e.log.With().
Uint64("block_view", block.View).
Expand Down
29 changes: 12 additions & 17 deletions consensus/hotstuff/eventhandler/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,14 @@ func NewSafetyRules(t *testing.T) *SafetyRules {

// SafetyRules will not vote for any block, unless the blockID exists in votable map
safetyRules.On("ProduceVote", mock.Anything, mock.Anything).Return(
func(block *model.Proposal, _ uint64) *model.Vote {
func(block *model.SignedProposal, _ uint64) *model.Vote {
_, ok := safetyRules.votable[block.Block.BlockID]
if !ok {
return nil
}
return createVote(block.Block)
},
func(block *model.Proposal, _ uint64) error {
func(block *model.SignedProposal, _ uint64) error {
_, ok := safetyRules.votable[block.Block.BlockID]
if !ok {
return model.NewNoVoteErrorf("block not found")
Expand Down Expand Up @@ -179,7 +179,7 @@ func NewForks(t *testing.T, finalized uint64) *Forks {
}

f.On("AddValidatedBlock", mock.Anything).Return(func(proposal *model.Block) error {
log.Info().Msgf("forks.AddValidatedBlock received Proposal for view: %v, QC: %v\n", proposal.View, proposal.QC.View)
log.Info().Msgf("forks.AddValidatedBlock received Block proposal for view: %v, QC: %v\n", proposal.View, proposal.QC.View)
return f.addProposal(proposal)
}).Maybe()

Expand Down Expand Up @@ -228,14 +228,12 @@ type BlockProducer struct {
}

func (b *BlockProducer) MakeBlockProposal(view uint64, qc *flow.QuorumCertificate, lastViewTC *flow.TimeoutCertificate) (*flow.Header, error) {
return model.ProposalToFlow(&model.Proposal{
Block: helper.MakeBlock(
return helper.SignedProposalToFlow(helper.MakeSignedProposal(helper.WithProposal(
helper.MakeProposal(helper.WithBlock(helper.MakeBlock(
helper.WithBlockView(view),
helper.WithBlockQC(qc),
helper.WithBlockProposer(b.proposerID),
),
LastViewTC: lastViewTC,
}), nil
helper.WithBlockProposer(b.proposerID))),
helper.WithLastViewTC(lastViewTC))))), nil
}

func TestEventHandler(t *testing.T) {
Expand All @@ -258,8 +256,8 @@ type EventHandlerSuite struct {

initView uint64 // the current view at the beginning of the test case
endView uint64 // the expected current view at the end of the test case
parentProposal *model.Proposal
votingProposal *model.Proposal
parentProposal *model.SignedProposal
votingProposal *model.SignedProposal
qc *flow.QuorumCertificate
tc *flow.TimeoutCertificate
newview *model.NewViewEvent
Expand Down Expand Up @@ -670,7 +668,7 @@ func (es *EventHandlerSuite) TestOnReceiveTc_NextLeaderProposes() {

// proposed block should contain valid newest QC and lastViewTC
expectedNewestQC := es.paceMaker.NewestQC()
proposal := model.ProposalFromFlow(header)
proposal := model.SignedProposalFromFlow(header)
require.Equal(es.T(), expectedNewestQC, proposal.Block.QC)
require.Equal(es.T(), es.paceMaker.LastViewTC(), proposal.LastViewTC)
}).Once()
Expand Down Expand Up @@ -1033,10 +1031,7 @@ func createVote(block *model.Block) *model.Vote {
}
}

func createProposal(view uint64, qcview uint64) *model.Proposal {
func createProposal(view uint64, qcview uint64) *model.SignedProposal {
block := createBlockWithQC(view, qcview)
return &model.Proposal{
Block: block,
SigData: nil,
}
return helper.MakeSignedProposal(helper.WithProposal(helper.MakeProposal(helper.WithBlock(block))))
}
4 changes: 2 additions & 2 deletions consensus/hotstuff/eventloop/event_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// it contains an attached insertionTime that is used to measure how long we have waited between queening proposal and
// actually processing by `EventHandler`.
type queuedProposal struct {
proposal *model.Proposal
proposal *model.SignedProposal
insertionTime time.Time
}

Expand Down Expand Up @@ -263,7 +263,7 @@ func (el *EventLoop) loop(ctx context.Context) error {
}

// SubmitProposal pushes the received block to the proposals channel
func (el *EventLoop) SubmitProposal(proposal *model.Proposal) {
func (el *EventLoop) SubmitProposal(proposal *model.SignedProposal) {
queueItem := queuedProposal{
proposal: proposal,
insertionTime: time.Now(),
Expand Down
8 changes: 4 additions & 4 deletions consensus/hotstuff/eventloop/event_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *EventLoopTestSuite) TestReadyDone() {

// Test_SubmitQC tests that submitted proposal is eventually sent to event handler for processing
func (s *EventLoopTestSuite) Test_SubmitProposal() {
proposal := helper.MakeProposal()
proposal := helper.MakeSignedProposal()
processed := atomic.NewBool(false)
s.eh.On("OnReceiveProposal", proposal).Run(func(args mock.Arguments) {
processed.Store(true)
Expand Down Expand Up @@ -229,7 +229,7 @@ func TestEventLoop_Timeout(t *testing.T) {
go func() {
defer wg.Done()
for !processed.Load() {
eventLoop.SubmitProposal(helper.MakeProposal())
eventLoop.SubmitProposal(helper.MakeSignedProposal())
}
}()

Expand Down Expand Up @@ -258,7 +258,7 @@ func TestReadyDoneWithStartTime(t *testing.T) {
require.NoError(t, err)

done := make(chan struct{})
eh.On("OnReceiveProposal", mock.AnythingOfType("*model.Proposal")).Run(func(args mock.Arguments) {
eh.On("OnReceiveProposal", mock.AnythingOfType("*model.SignedProposal")).Run(func(args mock.Arguments) {
require.True(t, time.Now().After(startTime))
close(done)
}).Return(nil).Once()
Expand All @@ -271,7 +271,7 @@ func TestReadyDoneWithStartTime(t *testing.T) {

parentBlock := unittest.BlockHeaderFixture()
block := unittest.BlockHeaderWithParentFixture(parentBlock)
eventLoop.SubmitProposal(model.ProposalFromFlow(block))
eventLoop.SubmitProposal(model.SignedProposalFromFlow(block))

unittest.RequireCloseBefore(t, done, startTimeDuration+100*time.Millisecond, "proposal wasn't received")
cancel()
Expand Down
1 change: 0 additions & 1 deletion consensus/hotstuff/forks/block_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ func (bb *BlockBuilder) Proposals() ([]*model.Proposal, error) {
PayloadHash: payloadHash,
},
LastViewTC: lastViewTC,
SigData: nil,
}
proposal.Block.BlockID = makeBlockID(proposal.Block)

Expand Down
2 changes: 1 addition & 1 deletion consensus/hotstuff/forks/forks.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func (f *Forks) checkForAdvancingFinalization(certifiedBlock *model.CertifiedBlo
parentBlock := parentVertex.(*BlockContainer).Block()

// Note: we assume that all stored blocks pass Forks.EnsureBlockIsValidExtension(block);
// specifically, that Proposal's ViewNumber is strictly monotonically
// specifically, that block's ViewNumber is strictly monotonically
// increasing which is enforced by LevelledForest.VerifyVertex(...)
// We denote:
// * a DIRECT 1-chain as '<-'
Expand Down
6 changes: 3 additions & 3 deletions consensus/hotstuff/forks/forks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func TestFinalize_Multiple2Chains(t *testing.T) {
}

// TestFinalize_OrphanedFork tests that we can finalize a block which causes a conflicting fork to be orphaned.
// We ingest the the following block tree:
// We ingest the following block tree:
//
// [◄(1) 2] [◄(2) 3]
// [◄(2) 4] [◄(4) 5] [◄(5) 6]
Expand Down Expand Up @@ -389,7 +389,7 @@ func TestIgnoreBlocksBelowFinalizedView(t *testing.T) {
}

// TestDoubleProposal tests that the DoubleProposal notification is emitted when two different
// blocks for the same view are added. We ingest the the following block tree:
// blocks for the same view are added. We ingest the following block tree:
//
// / [◄(1) 2]
// [1]
Expand Down Expand Up @@ -460,7 +460,7 @@ func TestConflictingQCs(t *testing.T) {
}

// TestConflictingFinalizedForks checks that finalizing 2 conflicting forks should return model.ByzantineThresholdExceededError
// We ingest the the following block tree:
// We ingest the following block tree:
//
// [◄(1) 2] [◄(2) 3] [◄(3) 4] [◄(4) 5]
// [◄(2) 6] [◄(6) 7] [◄(7) 8]
Expand Down
51 changes: 47 additions & 4 deletions consensus/hotstuff/helper/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,42 @@ func WithBlockQC(qc *flow.QuorumCertificate) func(*model.Block) {
}
}

func MakeSignedProposal(options ...func(*model.SignedProposal)) *model.SignedProposal {
proposal := &model.SignedProposal{
Proposal: *MakeProposal(),
SigData: unittest.SignatureFixture(),
}
for _, option := range options {
option(proposal)
}
return proposal
}

func MakeProposal(options ...func(*model.Proposal)) *model.Proposal {
proposal := &model.Proposal{
Block: MakeBlock(),
SigData: unittest.SignatureFixture(),
Block: MakeBlock(),
LastViewTC: nil,
}
for _, option := range options {
option(proposal)
}
return proposal
}

func WithProposal(proposal *model.Proposal) func(*model.SignedProposal) {
return func(signedProposal *model.SignedProposal) {
signedProposal.Proposal = *proposal
}
}

func WithBlock(block *model.Block) func(*model.Proposal) {
return func(proposal *model.Proposal) {
proposal.Block = block
}
}

func WithSigData(sigData []byte) func(*model.Proposal) {
return func(proposal *model.Proposal) {
func WithSigData(sigData []byte) func(*model.SignedProposal) {
return func(proposal *model.SignedProposal) {
proposal.SigData = sigData
}
}
Expand All @@ -84,3 +101,29 @@ func WithLastViewTC(lastViewTC *flow.TimeoutCertificate) func(*model.Proposal) {
proposal.LastViewTC = lastViewTC
}
}

// SignedProposalToFlow turns a block proposal into a flow header.
//
// CAUTION: This function is only suitable for TESTING purposes ONLY.
// In the conversion from `flow.Header` to HoStuff's `model.Block` we loose information
// (e.g. `ChainID` and `Height` are not included in `model.Block`) and hence the conversion
// is *not reversible*. This is on purpose, because we wanted to only expose data to
// HotStuff that HotStuff really needs.
func SignedProposalToFlow(proposal *model.SignedProposal) *flow.Header {

block := proposal.Block
header := &flow.Header{
ParentID: block.QC.BlockID,
PayloadHash: block.PayloadHash,
Timestamp: block.Timestamp,
View: block.View,
ParentView: block.QC.View,
ParentVoterIndices: block.QC.SignerIndices,
ParentVoterSigData: block.QC.SigData,
ProposerID: block.ProposerID,
ProposerSigData: proposal.SigData,
LastViewTC: proposal.LastViewTC,
}

return header
}
2 changes: 1 addition & 1 deletion consensus/hotstuff/integration/connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func Connect(t *testing.T, instances []*Instance) {
}

// convert into proposal immediately
proposal := model.ProposalFromFlow(header)
proposal := model.SignedProposalFromFlow(header)

// store locally and loop back to engine for processing
sender.ProcessBlock(proposal)
Expand Down
18 changes: 9 additions & 9 deletions consensus/hotstuff/integration/filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

// VoteFilter is a filter function for dropping Votes.
// Return value `true` implies that the the given Vote should be
// Return value `true` implies that the given Vote should be
// dropped, while `false` indicates that the Vote should be received.
type VoteFilter func(*model.Vote) bool

Expand All @@ -34,34 +34,34 @@ func BlockVotesBy(voterID flow.Identifier) VoteFilter {
}

// ProposalFilter is a filter function for dropping Proposals.
// Return value `true` implies that the the given Proposal should be
// dropped, while `false` indicates that the Proposal should be received.
type ProposalFilter func(*model.Proposal) bool
// Return value `true` implies that the given SignedProposal should be
// dropped, while `false` indicates that the SignedProposal should be received.
type ProposalFilter func(*model.SignedProposal) bool

func BlockNoProposals(*model.Proposal) bool {
func BlockNoProposals(*model.SignedProposal) bool {
return false
}

func BlockAllProposals(*model.Proposal) bool {
func BlockAllProposals(*model.SignedProposal) bool {
return true
}

// BlockProposalRandomly drops proposals randomly with a probability of `dropProbability` ∈ [0,1]
func BlockProposalRandomly(dropProbability float64) ProposalFilter {
return func(*model.Proposal) bool {
return func(*model.SignedProposal) bool {
return rand.Float64() < dropProbability
}
}

// BlockProposalsBy drops all proposals originating from the specified `proposerID`
func BlockProposalsBy(proposerID flow.Identifier) ProposalFilter {
return func(proposal *model.Proposal) bool {
return func(proposal *model.SignedProposal) bool {
return proposal.Block.ProposerID == proposerID
}
}

// TimeoutObjectFilter is a filter function for dropping TimeoutObjects.
// Return value `true` implies that the the given TimeoutObject should be
// Return value `true` implies that the given TimeoutObject should be
// dropped, while `false` indicates that the TimeoutObject should be received.
type TimeoutObjectFilter func(*model.TimeoutObject) bool

Expand Down
Loading

0 comments on commit be15d03

Please sign in to comment.