Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use SafetyData to avoid double-proposing instead of myLastProposedView #6921

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3dbab0b
remove outdated comment
jordanschalm Jan 22, 2025
81d76a1
set myLastProposedView when starting event handler
jordanschalm Jan 22, 2025
84b4d86
cache Persister values
jordanschalm Jan 22, 2025
396331c
return error from persister.New
jordanschalm Jan 22, 2025
3d13e71
remove myLastProposedView field
jordanschalm Jan 22, 2025
d549e11
initialize mutex field
jordanschalm Jan 22, 2025
76fc6c1
works better this way
jordanschalm Jan 23, 2025
848e0df
update error handling of block producer
jordanschalm Jan 23, 2025
019f996
add err handling to event handler
jordanschalm Jan 23, 2025
9b9b7c3
add builder error passthrough docs and tests
jordanschalm Jan 24, 2025
4aef784
event handler test: block producer doesn't produce twice
jordanschalm Jan 24, 2025
cef6002
rm error return from persister
jordanschalm Jan 24, 2025
ec8306d
mock gen
jordanschalm Jan 24, 2025
c8dc654
persister mock
jordanschalm Jan 24, 2025
b2b61fb
undo unecessary changes to integration mocks
jordanschalm Jan 24, 2025
45e11bb
revert Persister changes
jordanschalm Jan 28, 2025
78e76d8
update util cmd
jordanschalm Jan 28, 2025
c90b792
mocks
jordanschalm Jan 28, 2025
ad720f6
Apply suggestions from code review
jordanschalm Jan 28, 2025
9c7f2c4
update builder docs
jordanschalm Jan 28, 2025
9c74f6c
error handling
jordanschalm Jan 28, 2025
2635a5c
Merge branch 'jord/hotstuff-safety-data-persist-bug' of github.com:on…
jordanschalm Jan 28, 2025
11e6059
explicitly test for NoVoteError
jordanschalm Jan 28, 2025
2f9cde3
update scaffold - ping service
jordanschalm Jan 28, 2025
46e031b
fix import
jordanschalm Jan 28, 2025
a1bd4ae
Apply suggestions from code review
jordanschalm Jan 28, 2025
587bcf9
address review feedback
jordanschalm Jan 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,10 @@ func main() {
notifier.AddFollowerConsumer(followerDistributor)

// initialize the persister
persist := persister.New(node.DB, node.RootChainID)
persist, err := persister.New(node.DB, node.RootChainID)
if err != nil {
return nil, err
}

finalizedBlock, err := node.State.Final().Head()
if err != nil {
Expand Down Expand Up @@ -710,7 +713,7 @@ func main() {
Component("block rate cruise control", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
livenessData, err := hotstuffModules.Persist.GetLivenessData()
if err != nil {
return nil, err
return nil, fmt.Errorf("could not load liveness data: %w", err)
}
ctl, err := cruisectl.NewBlockTimeController(node.Logger, metrics.NewCruiseCtlMetrics(), cruiseCtlConfig, node.State, livenessData.CurrentView)
if err != nil {
Expand Down
44 changes: 25 additions & 19 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,27 @@ func (fnb *FlowNodeBuilder) EnqueuePingService() {
fnb.Component("ping service", func(node *NodeConfig) (module.ReadyDoneAware, error) {
pingLibP2PProtocolID := protocols.PingProtocolId(node.SporkID)

var hotstuffViewFunc func() (uint64, error)
// Setup consensus nodes to report their HotStuff view
if fnb.BaseConfig.NodeRole == flow.RoleConsensus.String() {
hotstuffReader, err := persister.NewReader(node.DB, node.RootChainID)
if err != nil {
return nil, err
}
hotstuffViewFunc = func() (uint64, error) {
livenessData, err := hotstuffReader.GetLivenessData()
if err != nil {
return 0, fmt.Errorf("could not get liveness data: %w", err)
}
return livenessData.CurrentView, nil
}
} else {
// All other node roles do not report their hotstuff view
hotstuffViewFunc = func() (uint64, error) {
return 0, fmt.Errorf("hotstuff view reporting disabled")
}
}

// setup the Ping provider to return the software version and the sealed block height
pingInfoProvider := &ping.InfoProvider{
SoftwareVersionFun: func() string {
Expand All @@ -284,28 +305,13 @@ func (fnb *FlowNodeBuilder) EnqueuePingService() {
}
return head.Height, nil
},
HotstuffViewFun: func() (uint64, error) {
return 0, fmt.Errorf("hotstuff view reporting disabled")
},
}

// only consensus roles will need to report hotstuff view
if fnb.BaseConfig.NodeRole == flow.RoleConsensus.String() {
// initialize the persister
persist := persister.New(node.DB, node.RootChainID)

pingInfoProvider.HotstuffViewFun = func() (uint64, error) {
livenessData, err := persist.GetLivenessData()
if err != nil {
return 0, err
}

return livenessData.CurrentView, nil
}
HotstuffViewFun: hotstuffViewFunc,
}

pingService, err := node.EngineRegistry.RegisterPingService(pingLibP2PProtocolID, pingInfoProvider)

if err != nil {
return nil, fmt.Errorf("could not register ping service: %w", err)
}
node.PingService = pingService

return &module.NoopReadyDoneAware{}, err
Expand Down
6 changes: 5 additions & 1 deletion cmd/util/cmd/read-hotstuff/cmd/get_liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/spf13/cobra"

"github.com/onflow/flow-go/cmd/util/cmd/common"
"github.com/onflow/flow-go/consensus/hotstuff/persister"
)

var GetLivenessCmd = &cobra.Command{
Expand All @@ -28,7 +29,10 @@ func runGetLivenessData(*cobra.Command, []string) {
}

rootBlock := state.Params().FinalizedRoot()
reader := NewHotstuffReader(db, rootBlock.ChainID)
reader, err := persister.NewReader(db, rootBlock.ChainID)
if err != nil {
log.Fatal().Err(err).Msg("could not create reader from db")
}

log.Info().Msg("getting hotstuff liveness data")

Expand Down
6 changes: 5 additions & 1 deletion cmd/util/cmd/read-hotstuff/cmd/get_safety.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/spf13/cobra"

"github.com/onflow/flow-go/cmd/util/cmd/common"
"github.com/onflow/flow-go/consensus/hotstuff/persister"
)

var GetSafetyCmd = &cobra.Command{
Expand All @@ -29,7 +30,10 @@ func runGetSafetyData(*cobra.Command, []string) {

rootBlock := state.Params().FinalizedRoot()

reader := NewHotstuffReader(db, rootBlock.ChainID)
reader, err := persister.NewReader(db, rootBlock.ChainID)
if err != nil {
log.Fatal().Err(err).Msg("could not create reader from db")
}

log.Info().Msg("getting hotstuff safety data")

Expand Down
25 changes: 0 additions & 25 deletions cmd/util/cmd/read-hotstuff/cmd/reader.go

This file was deleted.

7 changes: 5 additions & 2 deletions consensus/hotstuff/block_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import (
// concurrent singing requests, we enable various optimizations of optimistic and/or upfront block production.
type BlockProducer interface {
// MakeBlockProposal builds a new HotStuff block proposal using the given view,
// the given quorum certificate for its parent and [optionally] a timeout certificate for last view(could be nil).
// No errors are expected during normal operation.
// the given quorum certificate for its parent and [optionally] a timeout certificate for last view (could be nil).
// Error Returns:
// - model.NoVoteError if it is not safe for us to vote (our proposal includes our vote)
// for this view. This can happen if we have already proposed or timed out this view.
// - generic error in case of unexpected failure
MakeBlockProposal(view uint64, qc *flow.QuorumCertificate, lastViewTC *flow.TimeoutCertificate) (*flow.Header, error)
}
18 changes: 15 additions & 3 deletions consensus/hotstuff/blockproducer/block_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"fmt"

"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/irrecoverable"
)

// BlockProducer is responsible for producing new block proposals. It is a service component to HotStuff's
Expand Down Expand Up @@ -42,7 +44,10 @@ func New(safetyRules hotstuff.SafetyRules, committee hotstuff.Replicas, builder

// MakeBlockProposal builds a new HotStuff block proposal using the given view,
// the given quorum certificate for its parent and [optionally] a timeout certificate for last view(could be nil).
// No errors are expected during normal operation.
// Error Returns:
// - model.NoVoteError if it is not safe for us to vote (our proposal includes our vote)
// for this view. This can happen if we have already proposed or timed out this view.
// - generic error in case of unexpected failure
func (bp *BlockProducer) MakeBlockProposal(view uint64, qc *flow.QuorumCertificate, lastViewTC *flow.TimeoutCertificate) (*flow.Header, error) {
// the custom functions allows us to set some custom fields on the block;
// in hotstuff, we use this for view number and signature-related fields
Expand All @@ -57,9 +62,16 @@ func (bp *BlockProducer) MakeBlockProposal(view uint64, qc *flow.QuorumCertifica
}

signer := newSafetyRulesConcurrencyWrapper(bp.safetyRules)
header, err := bp.builder.BuildOn(qc.BlockID, setHotstuffFields, signer.Sign)
header, err := bp.builder.BuildOn(
qc.BlockID,
setHotstuffFields, // never returns an error
signer.Sign, // may return model.NoVoteError, which we handle below
)
if err != nil {
return nil, fmt.Errorf("could not build block proposal on top of %v: %w", qc.BlockID, err)
if model.IsNoVoteError(err) {
return nil, fmt.Errorf("unsafe to vote for own proposal on top of %x: %w", qc.BlockID, err)
}
return nil, irrecoverable.NewExceptionf("could not build block proposal on top of %v: %w", qc.BlockID, err)
}
if !signer.IsSigningComplete() {
return nil, fmt.Errorf("signer has not yet completed signing")
Expand Down
5 changes: 4 additions & 1 deletion consensus/hotstuff/blockproducer/safety_rules_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ func newSafetyRulesConcurrencyWrapper(safetyRules hotstuff.SafetyRules) *safetyR
// Safe under concurrent calls. Per convention, this method should be called exactly once.
// Only the first call will succeed, and subsequent calls error. The implementation is backed
// by `SafetyRules` and thereby guarantees consensus safety for singing block proposals.
// No errors expected during normal operations
// Error Returns:
// - model.NoVoteError if it is not safe for us to vote (our proposal includes our vote)
// for this view. This can happen if we have already proposed or timed out this view.
// - generic error in case of unexpected failure
func (w *safetyRulesConcurrencyWrapper) Sign(unsignedHeader *flow.Header) error {
if !w.signingStatus.CompareAndSwap(0, 1) { // value of `signingStatus` is something else than 0
return fmt.Errorf("signer has already commenced signing; possibly repeated signer call")
Expand Down
61 changes: 6 additions & 55 deletions consensus/hotstuff/eventhandler/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,35 +45,6 @@ type EventHandler struct {
committee hotstuff.Replicas
safetyRules hotstuff.SafetyRules
notifier hotstuff.Consumer

// myLastProposedView is the latest view that this node has created a proposal for
// CAUTION: in-memory only; information will be lost once the node reboots. This is fine for the following reason:
// 1. At the moment, the block construction logic persists its blocks in the database, _before_ returning the
// reference to the EventHandler, which subsequently publishes the block (via the `OnOwnProposal` notification).
// Therefore, for each view that this consensus participant published a block for, this block is also in the database.
// 2. Before constructing a proposal for view v, the node confirms that there is no block for view v already stored in
// its local Forks, and skips the block production otherwise. When the node boots up, it populates Forks with all
// unfinalized blocks. Hence, we conclude:
// Let v be a view for which this node constructed a proposal _before_ rebooting. Then, after rebooting it will never
// construct another proposal for view v.
// 3. What remains is to show that this node will not propose for views which it has previously proposed for without any
// reboots. Conceptually, this is guaranteed by the SafetyRules, but only if voting and signing proposals is _both_
// done by safety rules. Unfortunately, the current implementation signs its own proposals _independently_ of safety rules.
// Hence, we add the following logic:
// - `myLastProposedView` is zero in case this node has not generated any proposal since its last reboot. Then, argument
// 2. guarantees that the node will not double-propose.
// - Whenever this node constructed a proposal for view v, it will set `myLastProposedView` to value `v`, _before_
// publishing the proposal (via the `OnOwnProposal` notification).
// - Only if `v < myLastProposedView`, this node will engage its block production logic for view `v`. Therefore, it is
// guaranteed that this node has not generated any proposal for view v since its last reboot.
// In summary, argument 2. and 3. guarantee that this node will not double-propose (independently of whether the node
// restarted or not). Note that this holds, _without_ the node needing to store newly generated proposals right away in `Forks`.
// On the happy path (no restarts), updating `myLastProposedView` will suffice to prevent creating two proposals for the same view.
// The node's own proposal will be added to Forks _after_ the broadcast the same way as proposals from other nodes.
// On the unhappy path, the node's own proposals will be added to Forks along with unfinalized proposals from other nodes.
// TODO: use safety rules also for block signing, which produces the same guarantees of not double-proposing without this extra logic
// For further details, see issue https://github.com/onflow/flow-go/issues/6389
myLastProposedView uint64
}

var _ hotstuff.EventHandler = (*EventHandler)(nil)
Expand Down Expand Up @@ -369,30 +340,6 @@ func (e *EventHandler) proposeForNewViewIfPrimary() error {
return nil
}

// CASE A: Preventing proposal equivocation on the happy path
// We will never produce two proposals for the same view, _since_ the last reboot. This is because without a reboot,
// we can only proceed once beyond the following lines.
if curView <= e.myLastProposedView {
log.Debug().Msg("already proposed for current view")
return nil
}
e.myLastProposedView = curView

// CASE B: Preventing proposal equivocation on the unhappy path
// We will never produce a proposal for view v, if we already constructed a proposal for the same view _before_ the
// most recent reboot. This is because during proposal construction (further below), (i) the block is saved in the database
// _before_ a reference is returned to the EventHandler and (ii) all unfinalized proposals are added to Forks during the reboot.
for _, b := range e.forks.GetBlocksForView(curView) { // on the happy path, this slice is empty
if b.ProposerID == e.committee.Self() {
log.Debug().Msg("already proposed for current view")
return nil
} else {
// sanity check: the following code should never be reached, as this node is the current leader, i.e.
// we should _not_ consider a proposal for this view from any other as valid and store it in forks.
return fmt.Errorf("this node (%v) is leader for the current view %d, but have a proposal from node %v for this view", currentLeader, curView, b.ProposerID)
}
}

// attempt to generate proposal:
newestQC := e.paceMaker.NewestQC()
lastViewTC := e.paceMaker.LastViewTC()
Expand Down Expand Up @@ -447,12 +394,12 @@ func (e *EventHandler) proposeForNewViewIfPrimary() error {
// except that we are skipping the ComplianceEngine (assuming that our own proposals are protocol-compliant).
//
// Context:
// • On constraint (i): We want to support consensus committees only consisting of a *single* node. If the EvenHandler
// • On constraint (i): We want to support consensus committees only consisting of a *single* node. If the EventHandler
// internally processed the block right away via a direct message call, the call-stack would be ever-growing and
// the node would crash eventually (we experienced this with a very early HotStuff implementation). Specifically,
// if we wanted to process the block directly without taking a detour through the EventLoop's inbound queue,
// we would call `OnReceiveProposal` here. The function `OnReceiveProposal` would then end up calling
// then end up calling `proposeForNewViewIfPrimary` (this function) to generate the next proposal, which again
// `proposeForNewViewIfPrimary` (this function) to generate the next proposal, which again
// would result in calling `OnReceiveProposal` and so on so forth until the call stack or memory limit is reached
// and the node crashes. This is only a problem for consensus committees of size 1.
// • On constraint (ii): When adding a proposal to Forks, Forks emits a `BlockIncorporatedEvent` notification, which
Expand All @@ -470,6 +417,10 @@ func (e *EventHandler) proposeForNewViewIfPrimary() error {
// EventHandler and instead need to put it into the EventLoop's inbound queue to support consensus committees of size 1.
flowProposal, err := e.blockProducer.MakeBlockProposal(curView, newestQC, lastViewTC)
if err != nil {
if model.IsNoVoteError(err) {
log.Info().Err(err).Msg("aborting block proposal to prevent equivocation (likely re-entered proposal logic due to crash)")
return nil
}
return fmt.Errorf("can not make block proposal for curView %v: %w", curView, err)
}
targetPublicationTime := e.paceMaker.TargetPublicationTime(flowProposal.View, start, flowProposal.ParentID) // determine target publication time
Expand Down
19 changes: 16 additions & 3 deletions consensus/hotstuff/eventhandler/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,25 @@ func NewForks(t *testing.T, finalized uint64) *Forks {
return f
}

// BlockProducer mock will always make a valid block
// BlockProducer mock will always make a valid block, exactly once per view.
// If it is requested to make a block twice for the same view, returns model.NoVoteError
type BlockProducer struct {
proposerID flow.Identifier
proposerID flow.Identifier
producedBlockForView map[uint64]bool
}

func NewBlockProducer(proposerID flow.Identifier) *BlockProducer {
return &BlockProducer{
proposerID: proposerID,
producedBlockForView: make(map[uint64]bool),
}
}

func (b *BlockProducer) MakeBlockProposal(view uint64, qc *flow.QuorumCertificate, lastViewTC *flow.TimeoutCertificate) (*flow.Header, error) {
if b.producedBlockForView[view] {
return nil, model.NewNoVoteErrorf("block already produced")
}
b.producedBlockForView[view] = true
return helper.SignedProposalToFlow(helper.MakeSignedProposal(helper.WithProposal(
helper.MakeProposal(helper.WithBlock(helper.MakeBlock(
helper.WithBlockView(view),
Expand Down Expand Up @@ -283,7 +296,7 @@ func (es *EventHandlerSuite) SetupTest() {
es.forks = NewForks(es.T(), finalized)
es.persist = mocks.NewPersister(es.T())
es.persist.On("PutStarted", mock.Anything).Return(nil).Maybe()
es.blockProducer = &BlockProducer{proposerID: es.committee.Self()}
es.blockProducer = NewBlockProducer(es.committee.Self())
es.safetyRules = NewSafetyRules(es.T())
es.notifier = mocks.NewConsumer(es.T())
es.notifier.On("OnEventProcessed").Maybe()
Expand Down
25 changes: 16 additions & 9 deletions consensus/hotstuff/persister.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,30 @@
package hotstuff

// Persister is responsible for persisting state we need to bootstrap after a
// restart or crash.
// Persister is responsible for persisting minimal critical safety and liveness data for HotStuff:
// specifically [hotstuff.LivenessData] and [hotstuff.SafetyData].
type Persister interface {
jordanschalm marked this conversation as resolved.
Show resolved Hide resolved
// GetSafetyData will retrieve last persisted safety data.
// During normal operations, no errors are expected.
GetSafetyData() (*SafetyData, error)
PersisterReader

// PutSafetyData persists the last safety data.
// This method blocks until `safetyData` was successfully persisted.
// During normal operations, no errors are expected.
PutSafetyData(safetyData *SafetyData) error

// GetLivenessData will retrieve last persisted liveness data.
// During normal operations, no errors are expected.
GetLivenessData() (*LivenessData, error)

// PutLivenessData persists the last liveness data.
// This method blocks until `safetyData` was successfully persisted.
// During normal operations, no errors are expected.
PutLivenessData(livenessData *LivenessData) error
}

// PersisterReader exposes only the read-only parts of the Persister component.
// This is used to read information about the HotStuff instance's current state from other components.
// CAUTION: the write functions are hidden here, because it is NOT SAFE to use them outside the Hotstuff state machine.
type PersisterReader interface {
// GetSafetyData will retrieve last persisted safety data.
// During normal operations, no errors are expected.
GetSafetyData() (*SafetyData, error)

// GetLivenessData will retrieve last persisted liveness data.
// During normal operations, no errors are expected.
GetLivenessData() (*LivenessData, error)
}
Loading
Loading