Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(fork): drs upgrade when multiple forks fix #1266

Merged
merged 11 commits into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 45 additions & 57 deletions block/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func (m *Manager) MonitorForkUpdateLoop(ctx context.Context) error {

// checkForkUpdate checks if the hub has a fork update
func (m *Manager) checkForkUpdate(msg string) error {
defer m.forkMu.Unlock()
m.forkMu.Lock()

rollapp, err := m.SLClient.GetRollapp()
if err != nil {
return err
Expand All @@ -50,7 +53,7 @@ func (m *Manager) checkForkUpdate(msg string) error {
expectedRevision = rollapp.GetRevisionForHeight(nextHeight)
)

if shouldStopNode(expectedRevision, nextHeight, actualRevision, m.RunMode) {
if shouldStopNode(expectedRevision, nextHeight, actualRevision) {
instruction, err := m.createInstruction(expectedRevision)
if err != nil {
return err
Expand All @@ -66,7 +69,7 @@ func (m *Manager) checkForkUpdate(msg string) error {
return nil
}

// createInstruction writes file to disk with fork information
// createInstruction returns instruction with fork information
func (m *Manager) createInstruction(expectedRevision types.Revision) (types.Instruction, error) {
obsoleteDrs, err := m.SLClient.GetObsoleteDrs()
if err != nil {
Expand All @@ -91,45 +94,41 @@ func shouldStopNode(
expectedRevision types.Revision,
nextHeight uint64,
actualRevisionNumber uint64,
nodeMode uint,
) bool {
if nodeMode == RunModeFullNode {
return nextHeight > expectedRevision.StartHeight && actualRevisionNumber < expectedRevision.Number
}
return nextHeight >= expectedRevision.StartHeight && actualRevisionNumber < expectedRevision.Number
}

// forkNeeded returns true if the fork file exists
func (m *Manager) forkNeeded() (types.Instruction, bool) {
if instruction, err := types.LoadInstructionFromDisk(m.RootDir); err == nil {
return instruction, true
// getRevisionFromSL returns revision data for the specific height
func (m *Manager) getRevisionFromSL(height uint64) (types.Revision, error) {
rollapp, err := m.SLClient.GetRollapp()
if err != nil {
return types.Revision{}, err
}

return types.Instruction{}, false
return rollapp.GetRevisionForHeight(height), nil
}

// doFork creates fork blocks and submits a new batch with them
func (m *Manager) doFork(instruction types.Instruction) error {
// if fork (two) blocks are not produced and applied yet, produce them
if m.State.Height() < instruction.RevisionStartHeight+1 {
// add consensus msgs for upgrade DRS only if current DRS is obsolete
// add consensus msgs to upgrade DRS to running node version (msg is created in all cases and RDK will upgrade if necessary). If returns error if running version is deprecated.
consensusMsgs, err := m.prepareDRSUpgradeMessages(instruction.FaultyDRS)
if err != nil {
panic(fmt.Sprintf("prepare DRS upgrade messages: %v", err))
return fmt.Errorf("prepare DRS upgrade messages: %v", err)
}
// add consensus msg to bump the account sequences in all fork cases
consensusMsgs = append(consensusMsgs, &sequencers.MsgBumpAccountSequences{Authority: authtypes.NewModuleAddress("sequencers").String()})

// create fork blocks
err = m.createForkBlocks(instruction, consensusMsgs)
if err != nil {
panic(fmt.Sprintf("validate existing blocks: %v", err))
return fmt.Errorf("validate fork blocks: %v", err)
}
}

// submit fork batch including two fork blocks
if err := m.submitForkBatch(instruction.RevisionStartHeight); err != nil {
panic(fmt.Sprintf("ensure batch exists: %v", err))
return fmt.Errorf("submit fork batch: %v", err)
}

return nil
Expand Down Expand Up @@ -226,53 +225,49 @@ func (m *Manager) submitForkBatch(height uint64) error {
return nil
}

// updateStateWhenFork updates dymint state in case fork is detected
func (m *Manager) updateStateWhenFork() error {
// updateStateForNextRevision updates dymint stored state in case next height corresponds to a new revision, to enable syncing (and validation) for rollapps with multiple revisions.
func (m *Manager) updateStateForNextRevision() error {
// in case fork is detected dymint state needs to be updated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant comment as it doesn't add anything on top of the function godoc.
we should strive to have more reasoning in our comments, not just explaining what we do (but why we do it).
needs to be updated because..

if instruction, forkNeeded := m.forkNeeded(); forkNeeded {

// get next revision according to node height
nextRevision, err := m.getRevisionFromSL(m.State.NextHeight())
if err != nil {
return err
}

// if next height is revision start height, update local state
if nextRevision.StartHeight == m.State.NextHeight() {
// Set proposer to nil to force updating it from SL
m.State.SetProposer(nil)
// Upgrade revision on state
m.State.RevisionStartHeight = instruction.RevisionStartHeight
// this is necessary to pass ValidateConfigWithRollappParams when DRS upgrade is required
if instruction.RevisionStartHeight == m.State.NextHeight() {
m.State.SetRevision(instruction.Revision)
drsVersion, err := version.GetDRSVersion()
if err != nil {
return err
}
m.State.RollappParams.DrsVersion = drsVersion
m.State.RevisionStartHeight = nextRevision.StartHeight
m.State.SetRevision(nextRevision.Number)

// we set rollappparam to node drs version to pass ValidateConfigWithRollappParams check, when drs upgrade is necessary.
// if the node starts with the wrong version at revision start height, it will stop after applyBlock.
drsVersion, err := version.GetDRSVersion()
if err != nil {
return err
}
_, err := m.Store.SaveState(m.State, nil)
m.State.RollappParams.DrsVersion = drsVersion
// update stored state
_, err = m.Store.SaveState(m.State, nil)
return err
}
return nil
}

// checkRevisionAndFork checks if fork is needed after syncing, and performs fork actions
func (m *Manager) checkRevisionAndFork() error {
// it is checked again whether the node is the active proposer, since this could have changed after syncing.
amIProposerOnSL, err := m.AmIProposerOnSL()
if err != nil {
return fmt.Errorf("am i proposer on SL: %w", err)
}
if !amIProposerOnSL {
return fmt.Errorf("the node is no longer the proposer. please restart.")
}
// doForkWhenNewRevision creates and submit to SL fork blocks according to next revision start height.
func (m *Manager) doForkWhenNewRevision() error {
defer m.forkMu.Unlock()
m.forkMu.Lock()

// update sequencer in case it changed after syncing
err = m.UpdateProposerFromSL()
// get revision next height
expectedRevision, err := m.getRevisionFromSL(m.State.NextHeight())
if err != nil {
return err
}

// get the revision for the current height to check against local state
rollapp, err := m.SLClient.GetRollapp()
if err != nil {
return err
}
expectedRevision := rollapp.GetRevisionForHeight(m.State.NextHeight())

// create fork batch in case it has not been submitted yet
if m.LastSettlementHeight.Load() < expectedRevision.StartHeight {
instruction, err := m.createInstruction(expectedRevision)
Expand All @@ -293,13 +288,6 @@ func (m *Manager) checkRevisionAndFork() error {
panic("Inconsistent expected revision number from Hub. Unable to fork")
}

// remove instruction file after fork to avoid enter fork loop again
if _, instructionExists := m.forkNeeded(); instructionExists {
err := types.DeleteInstructionFromDisk(m.RootDir)
if err != nil {
return fmt.Errorf("deleting instruction file: %w", err)
}
}

return nil
// remove instruction file after fork
return types.DeleteInstructionFromDisk(m.RootDir)
}
2 changes: 1 addition & 1 deletion block/fork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestShouldStopNode(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
expectedRevision := tt.rollapp.GetRevisionForHeight(tt.height)
result := shouldStopNode(expectedRevision, tt.height, tt.block.Header.Version.App, tt.runMode)
result := shouldStopNode(expectedRevision, tt.height, tt.block.Header.Version.App)
assert.Equal(t, tt.expected, result)
})
}
Expand Down
17 changes: 8 additions & 9 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type Manager struct {

// LastBlockTime is the time of last produced block, used to measure batch skew time
LastBlockTime atomic.Int64

// mutex used to avoid stopping node when fork is detected but proposer is creating/sending fork batch
forkMu sync.Mutex
/*
Sequencer and full-node
*/
Expand Down Expand Up @@ -193,8 +196,8 @@ func NewManager(
return nil, err
}

// update dymint state with fork info
err = m.updateStateWhenFork()
// update dymint state with next revision info
err = m.updateStateForNextRevision()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -247,13 +250,9 @@ func (m *Manager) Start(ctx context.Context) error {
return fmt.Errorf("am i proposer on SL: %w", err)
}

if amIProposerOnSL || m.AmIProposerOnRollapp() {
m.RunMode = RunModeProposer
} else {
m.RunMode = RunModeFullNode
}
amIProposer := amIProposerOnSL || m.AmIProposerOnRollapp()

m.logger.Info("starting block manager", "mode", map[bool]string{true: "proposer", false: "full node"}[m.RunMode == RunModeProposer])
m.logger.Info("starting block manager", "mode", map[bool]string{true: "proposer", false: "full node"}[amIProposer])

// update local state from latest state in settlement
err = m.updateFromLastSettlementState()
Expand Down Expand Up @@ -292,7 +291,7 @@ func (m *Manager) Start(ctx context.Context) error {
})

// run based on the node role
if m.RunMode == RunModeFullNode {
if !amIProposer {
return m.runAsFullNode(ctx, eg)
}

Expand Down
32 changes: 21 additions & 11 deletions block/modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
// setFraudHandler sets the fraud handler for the block manager.
func (m *Manager) runAsFullNode(ctx context.Context, eg *errgroup.Group) error {
m.logger.Info("starting block manager", "mode", "full node")
m.RunMode = RunModeFullNode
// update latest finalized height
err := m.updateLastFinalizedHeightFromSettlement()
if err != nil {
Expand All @@ -36,19 +37,13 @@ func (m *Manager) runAsFullNode(ctx context.Context, eg *errgroup.Group) error {

m.subscribeFullNodeEvents(ctx)

if _, instructionExists := m.forkNeeded(); instructionExists {
// remove instruction file after fork to avoid enter fork loop again
err := types.DeleteInstructionFromDisk(m.RootDir)
if err != nil {
return fmt.Errorf("deleting instruction file: %w", err)
}
}

return nil
// remove instruction file after fork to avoid enter fork loop again
return types.DeleteInstructionFromDisk(m.RootDir)
}

func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error {
m.logger.Info("starting block manager", "mode", "proposer")
m.RunMode = RunModeProposer
// Subscribe to batch events, to update last submitted height in case batch confirmation was lost. This could happen if the sequencer crash/restarted just after submitting a batch to the settlement and by the time we query the last batch, this batch wasn't accepted yet.
go uevent.MustSubscribe(ctx, m.Pubsub, "updateSubmittedHeightLoop", settlement.EventQueryNewSettlementBatchAccepted, m.UpdateLastSubmittedHeight, m.logger)
// Subscribe to P2P received blocks events (used for P2P syncing).
Expand All @@ -61,8 +56,23 @@ func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error {
// Sequencer must wait till node is synced till last submittedHeight, in case it is not
m.waitForSettlementSyncing()

// checkRevisionAndFork executes fork if necessary
err := m.checkRevisionAndFork()
// it is checked again whether the node is the active proposer, since this could have changed after syncing.
amIProposerOnSL, err := m.AmIProposerOnSL()
if err != nil {
return fmt.Errorf("am i proposer on SL: %w", err)
}
if !amIProposerOnSL {
return fmt.Errorf("the node is no longer the proposer. please restart.")
}

// update l2 proposer from SL in case it changed after syncing
err = m.UpdateProposerFromSL()
if err != nil {
return err
}

// doForkWhenNewRevision executes fork if necessary
err = m.doForkWhenNewRevision()
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions block/production_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ func TestUpdateInitialSequencerSet(t *testing.T) {
slmock.On("Start").Return(nil)
slmock.On("GetProposer").Return(proposer)
slmock.On("GetAllSequencers").Return([]types.Sequencer{proposer, sequencer}, nil)
slmock.On("GetRollapp").Return(&types.Rollapp{}, nil)

manager, err := testutil.GetManagerWithProposerKey(testutil.GetManagerConfig(), lib2pPrivKey, slmock, 1, 1, 0, proxyApp, nil)
require.NoError(err)
Expand Down Expand Up @@ -459,6 +460,7 @@ func TestUpdateExistingSequencerSet(t *testing.T) {
slmock.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
slmock.On("Start").Return(nil)
slmock.On("GetProposer").Return(proposer)
slmock.On("GetRollapp").Return(&types.Rollapp{}, nil)

manager, err := testutil.GetManagerWithProposerKey(testutil.GetManagerConfig(), lib2pPrivKey, slmock, 1, 1, 0, proxyApp, nil)
require.NoError(err)
Expand Down
1 change: 1 addition & 0 deletions block/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func TestBatchSubmissionFailedSubmission(t *testing.T) {
slmock.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
slmock.On("Start").Return(nil)
slmock.On("GetProposer").Return(proposer)
slmock.On("GetRollapp").Return(&types.Rollapp{}, nil)

manager, err := testutil.GetManagerWithProposerKey(testutil.GetManagerConfig(), lib2pPrivKey, slmock, 1, 1, 0, proxyApp, nil)
require.NoError(err)
Expand Down
10 changes: 6 additions & 4 deletions types/instruction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package types

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
)
Expand Down Expand Up @@ -52,11 +53,12 @@ func InstructionExists(dir string) bool {
}

func DeleteInstructionFromDisk(dir string) error {
if !InstructionExists(dir) {
return nil
}
filePath := filepath.Join(dir, instructionFileName)
err := os.Remove(filePath)
if err != nil {
return err
if err := os.Remove(filePath); err != nil {
return fmt.Errorf("deleting instruction file: %w", err)
}

return nil
}
Loading