Skip to content

Commit

Permalink
fix(fork): drs upgrade when multiple forks fix (#1266)
Browse files Browse the repository at this point in the history
  • Loading branch information
srene authored Dec 8, 2024
1 parent c0e39f9 commit 7de4e89
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 82 deletions.
102 changes: 45 additions & 57 deletions block/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func (m *Manager) MonitorForkUpdateLoop(ctx context.Context) error {

// checkForkUpdate checks if the hub has a fork update
func (m *Manager) checkForkUpdate(msg string) error {
defer m.forkMu.Unlock()
m.forkMu.Lock()

rollapp, err := m.SLClient.GetRollapp()
if err != nil {
return err
Expand All @@ -50,7 +53,7 @@ func (m *Manager) checkForkUpdate(msg string) error {
expectedRevision = rollapp.GetRevisionForHeight(nextHeight)
)

if shouldStopNode(expectedRevision, nextHeight, actualRevision, m.RunMode) {
if shouldStopNode(expectedRevision, nextHeight, actualRevision) {
instruction, err := m.createInstruction(expectedRevision)
if err != nil {
return err
Expand All @@ -66,7 +69,7 @@ func (m *Manager) checkForkUpdate(msg string) error {
return nil
}

// createInstruction writes file to disk with fork information
// createInstruction returns instruction with fork information
func (m *Manager) createInstruction(expectedRevision types.Revision) (types.Instruction, error) {
obsoleteDrs, err := m.SLClient.GetObsoleteDrs()
if err != nil {
Expand All @@ -91,45 +94,41 @@ func shouldStopNode(
expectedRevision types.Revision,
nextHeight uint64,
actualRevisionNumber uint64,
nodeMode uint,
) bool {
if nodeMode == RunModeFullNode {
return nextHeight > expectedRevision.StartHeight && actualRevisionNumber < expectedRevision.Number
}
return nextHeight >= expectedRevision.StartHeight && actualRevisionNumber < expectedRevision.Number
}

// forkNeeded returns true if the fork file exists
func (m *Manager) forkNeeded() (types.Instruction, bool) {
if instruction, err := types.LoadInstructionFromDisk(m.RootDir); err == nil {
return instruction, true
// getRevisionFromSL returns revision data for the specific height
func (m *Manager) getRevisionFromSL(height uint64) (types.Revision, error) {
rollapp, err := m.SLClient.GetRollapp()
if err != nil {
return types.Revision{}, err
}

return types.Instruction{}, false
return rollapp.GetRevisionForHeight(height), nil
}

// doFork creates fork blocks and submits a new batch with them
func (m *Manager) doFork(instruction types.Instruction) error {
// if fork (two) blocks are not produced and applied yet, produce them
if m.State.Height() < instruction.RevisionStartHeight+1 {
// add consensus msgs for upgrade DRS only if current DRS is obsolete
// add consensus msgs to upgrade DRS to running node version (msg is created in all cases and RDK will upgrade if necessary). If returns error if running version is deprecated.
consensusMsgs, err := m.prepareDRSUpgradeMessages(instruction.FaultyDRS)
if err != nil {
panic(fmt.Sprintf("prepare DRS upgrade messages: %v", err))
return fmt.Errorf("prepare DRS upgrade messages: %v", err)
}
// add consensus msg to bump the account sequences in all fork cases
consensusMsgs = append(consensusMsgs, &sequencers.MsgBumpAccountSequences{Authority: authtypes.NewModuleAddress("sequencers").String()})

// create fork blocks
err = m.createForkBlocks(instruction, consensusMsgs)
if err != nil {
panic(fmt.Sprintf("validate existing blocks: %v", err))
return fmt.Errorf("validate fork blocks: %v", err)
}
}

// submit fork batch including two fork blocks
if err := m.submitForkBatch(instruction.RevisionStartHeight); err != nil {
panic(fmt.Sprintf("ensure batch exists: %v", err))
return fmt.Errorf("submit fork batch: %v", err)
}

return nil
Expand Down Expand Up @@ -226,53 +225,49 @@ func (m *Manager) submitForkBatch(height uint64) error {
return nil
}

// updateStateWhenFork updates dymint state in case fork is detected
func (m *Manager) updateStateWhenFork() error {
// updateStateForNextRevision updates dymint stored state in case next height corresponds to a new revision, to enable syncing (and validation) for rollapps with multiple revisions.
func (m *Manager) updateStateForNextRevision() error {
// in case fork is detected dymint state needs to be updated
if instruction, forkNeeded := m.forkNeeded(); forkNeeded {

// get next revision according to node height
nextRevision, err := m.getRevisionFromSL(m.State.NextHeight())
if err != nil {
return err
}

// if next height is revision start height, update local state
if nextRevision.StartHeight == m.State.NextHeight() {
// Set proposer to nil to force updating it from SL
m.State.SetProposer(nil)
// Upgrade revision on state
m.State.RevisionStartHeight = instruction.RevisionStartHeight
// this is necessary to pass ValidateConfigWithRollappParams when DRS upgrade is required
if instruction.RevisionStartHeight == m.State.NextHeight() {
m.State.SetRevision(instruction.Revision)
drsVersion, err := version.GetDRSVersion()
if err != nil {
return err
}
m.State.RollappParams.DrsVersion = drsVersion
m.State.RevisionStartHeight = nextRevision.StartHeight
m.State.SetRevision(nextRevision.Number)

// we set rollappparam to node drs version to pass ValidateConfigWithRollappParams check, when drs upgrade is necessary.
// if the node starts with the wrong version at revision start height, it will stop after applyBlock.
drsVersion, err := version.GetDRSVersion()
if err != nil {
return err
}
_, err := m.Store.SaveState(m.State, nil)
m.State.RollappParams.DrsVersion = drsVersion
// update stored state
_, err = m.Store.SaveState(m.State, nil)
return err
}
return nil
}

// checkRevisionAndFork checks if fork is needed after syncing, and performs fork actions
func (m *Manager) checkRevisionAndFork() error {
// it is checked again whether the node is the active proposer, since this could have changed after syncing.
amIProposerOnSL, err := m.AmIProposerOnSL()
if err != nil {
return fmt.Errorf("am i proposer on SL: %w", err)
}
if !amIProposerOnSL {
return fmt.Errorf("the node is no longer the proposer. please restart.")
}
// doForkWhenNewRevision creates and submit to SL fork blocks according to next revision start height.
func (m *Manager) doForkWhenNewRevision() error {
defer m.forkMu.Unlock()
m.forkMu.Lock()

// update sequencer in case it changed after syncing
err = m.UpdateProposerFromSL()
// get revision next height
expectedRevision, err := m.getRevisionFromSL(m.State.NextHeight())
if err != nil {
return err
}

// get the revision for the current height to check against local state
rollapp, err := m.SLClient.GetRollapp()
if err != nil {
return err
}
expectedRevision := rollapp.GetRevisionForHeight(m.State.NextHeight())

// create fork batch in case it has not been submitted yet
if m.LastSettlementHeight.Load() < expectedRevision.StartHeight {
instruction, err := m.createInstruction(expectedRevision)
Expand All @@ -293,13 +288,6 @@ func (m *Manager) checkRevisionAndFork() error {
panic("Inconsistent expected revision number from Hub. Unable to fork")
}

// remove instruction file after fork to avoid enter fork loop again
if _, instructionExists := m.forkNeeded(); instructionExists {
err := types.DeleteInstructionFromDisk(m.RootDir)
if err != nil {
return fmt.Errorf("deleting instruction file: %w", err)
}
}

return nil
// remove instruction file after fork
return types.DeleteInstructionFromDisk(m.RootDir)
}
2 changes: 1 addition & 1 deletion block/fork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestShouldStopNode(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
expectedRevision := tt.rollapp.GetRevisionForHeight(tt.height)
result := shouldStopNode(expectedRevision, tt.height, tt.block.Header.Version.App, tt.runMode)
result := shouldStopNode(expectedRevision, tt.height, tt.block.Header.Version.App)
assert.Equal(t, tt.expected, result)
})
}
Expand Down
17 changes: 8 additions & 9 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type Manager struct {

// LastBlockTime is the time of last produced block, used to measure batch skew time
LastBlockTime atomic.Int64

// mutex used to avoid stopping node when fork is detected but proposer is creating/sending fork batch
forkMu sync.Mutex
/*
Sequencer and full-node
*/
Expand Down Expand Up @@ -193,8 +196,8 @@ func NewManager(
return nil, err
}

// update dymint state with fork info
err = m.updateStateWhenFork()
// update dymint state with next revision info
err = m.updateStateForNextRevision()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -247,13 +250,9 @@ func (m *Manager) Start(ctx context.Context) error {
return fmt.Errorf("am i proposer on SL: %w", err)
}

if amIProposerOnSL || m.AmIProposerOnRollapp() {
m.RunMode = RunModeProposer
} else {
m.RunMode = RunModeFullNode
}
amIProposer := amIProposerOnSL || m.AmIProposerOnRollapp()

m.logger.Info("starting block manager", "mode", map[bool]string{true: "proposer", false: "full node"}[m.RunMode == RunModeProposer])
m.logger.Info("starting block manager", "mode", map[bool]string{true: "proposer", false: "full node"}[amIProposer])

// update local state from latest state in settlement
err = m.updateFromLastSettlementState()
Expand Down Expand Up @@ -292,7 +291,7 @@ func (m *Manager) Start(ctx context.Context) error {
})

// run based on the node role
if m.RunMode == RunModeFullNode {
if !amIProposer {
return m.runAsFullNode(ctx, eg)
}

Expand Down
32 changes: 21 additions & 11 deletions block/modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
// setFraudHandler sets the fraud handler for the block manager.
func (m *Manager) runAsFullNode(ctx context.Context, eg *errgroup.Group) error {
m.logger.Info("starting block manager", "mode", "full node")
m.RunMode = RunModeFullNode
// update latest finalized height
err := m.updateLastFinalizedHeightFromSettlement()
if err != nil {
Expand All @@ -36,19 +37,13 @@ func (m *Manager) runAsFullNode(ctx context.Context, eg *errgroup.Group) error {

m.subscribeFullNodeEvents(ctx)

if _, instructionExists := m.forkNeeded(); instructionExists {
// remove instruction file after fork to avoid enter fork loop again
err := types.DeleteInstructionFromDisk(m.RootDir)
if err != nil {
return fmt.Errorf("deleting instruction file: %w", err)
}
}

return nil
// remove instruction file after fork to avoid enter fork loop again
return types.DeleteInstructionFromDisk(m.RootDir)
}

func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error {
m.logger.Info("starting block manager", "mode", "proposer")
m.RunMode = RunModeProposer
// Subscribe to batch events, to update last submitted height in case batch confirmation was lost. This could happen if the sequencer crash/restarted just after submitting a batch to the settlement and by the time we query the last batch, this batch wasn't accepted yet.
go uevent.MustSubscribe(ctx, m.Pubsub, "updateSubmittedHeightLoop", settlement.EventQueryNewSettlementBatchAccepted, m.UpdateLastSubmittedHeight, m.logger)
// Subscribe to P2P received blocks events (used for P2P syncing).
Expand All @@ -61,8 +56,23 @@ func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error {
// Sequencer must wait till node is synced till last submittedHeight, in case it is not
m.waitForSettlementSyncing()

// checkRevisionAndFork executes fork if necessary
err := m.checkRevisionAndFork()
// it is checked again whether the node is the active proposer, since this could have changed after syncing.
amIProposerOnSL, err := m.AmIProposerOnSL()
if err != nil {
return fmt.Errorf("am i proposer on SL: %w", err)
}
if !amIProposerOnSL {
return fmt.Errorf("the node is no longer the proposer. please restart.")
}

// update l2 proposer from SL in case it changed after syncing
err = m.UpdateProposerFromSL()
if err != nil {
return err
}

// doForkWhenNewRevision executes fork if necessary
err = m.doForkWhenNewRevision()
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions block/production_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ func TestUpdateInitialSequencerSet(t *testing.T) {
slmock.On("Start").Return(nil)
slmock.On("GetProposer").Return(proposer)
slmock.On("GetAllSequencers").Return([]types.Sequencer{proposer, sequencer}, nil)
slmock.On("GetRollapp").Return(&types.Rollapp{}, nil)

manager, err := testutil.GetManagerWithProposerKey(testutil.GetManagerConfig(), lib2pPrivKey, slmock, 1, 1, 0, proxyApp, nil)
require.NoError(err)
Expand Down Expand Up @@ -459,6 +460,7 @@ func TestUpdateExistingSequencerSet(t *testing.T) {
slmock.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
slmock.On("Start").Return(nil)
slmock.On("GetProposer").Return(proposer)
slmock.On("GetRollapp").Return(&types.Rollapp{}, nil)

manager, err := testutil.GetManagerWithProposerKey(testutil.GetManagerConfig(), lib2pPrivKey, slmock, 1, 1, 0, proxyApp, nil)
require.NoError(err)
Expand Down
1 change: 1 addition & 0 deletions block/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func TestBatchSubmissionFailedSubmission(t *testing.T) {
slmock.On("Init", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
slmock.On("Start").Return(nil)
slmock.On("GetProposer").Return(proposer)
slmock.On("GetRollapp").Return(&types.Rollapp{}, nil)

manager, err := testutil.GetManagerWithProposerKey(testutil.GetManagerConfig(), lib2pPrivKey, slmock, 1, 1, 0, proxyApp, nil)
require.NoError(err)
Expand Down
10 changes: 6 additions & 4 deletions types/instruction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package types

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
)
Expand Down Expand Up @@ -52,11 +53,12 @@ func InstructionExists(dir string) bool {
}

func DeleteInstructionFromDisk(dir string) error {
if !InstructionExists(dir) {
return nil
}
filePath := filepath.Join(dir, instructionFileName)
err := os.Remove(filePath)
if err != nil {
return err
if err := os.Remove(filePath); err != nil {
return fmt.Errorf("deleting instruction file: %w", err)
}

return nil
}

0 comments on commit 7de4e89

Please sign in to comment.