Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
srene committed Nov 22, 2024
2 parents 2a6881d + 98a3348 commit bbe0909
Show file tree
Hide file tree
Showing 23 changed files with 793 additions and 139 deletions.
14 changes: 14 additions & 0 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
// 1. Save the proposer for the current height to the store.
// 2. Update the proposer in the state in case of rotation.
// 3. Save the state to the store (independently of the height). Here the proposer might differ from (1).
// 4. Save the last block sequencer set to the store if it's present (only applicable in the sequencer mode).
// here, (3) helps properly handle reboots (specifically when there's rotation).
// If reboot happens after block H (which rotates seqA -> seqB):
// - Block H+1 will be signed by seqB.
Expand Down Expand Up @@ -162,6 +163,19 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
return fmt.Errorf("update state: %w", err)
}

// 4. Save the last block sequencer set to the store if it's present (only applicable in the sequencer mode).
// The set from the state is dumped to memory on reboots. It helps to avoid sending unnecessary
// UspertSequencer consensus messages on reboots. This is not a 100% solution, because the sequencer set
// is not persisted in the store in full node mode. It's only used in the proposer mode. Therefore,
// on rotation from the full node to the proposer, the sequencer set is duplicated as consensus msgs.
// Though single-time duplication it's not a big deal.
if len(blockMetaData.SequencerSet) != 0 {
batch, err = m.Store.SaveLastBlockSequencerSet(blockMetaData.SequencerSet, batch)
if err != nil {
return fmt.Errorf("save last block sequencer set: %w", err)
}
}

err = batch.Commit()
if err != nil {
return fmt.Errorf("commit state: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion block/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (m *Manager) MonitorForkUpdateLoop(ctx context.Context) error {
}
select {
case <-ctx.Done():
return ctx.Err()
return nil
case <-ticker.C:
}
}
Expand Down
6 changes: 1 addition & 5 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,7 @@ func (m *Manager) Start(ctx context.Context) error {

// Start the settlement sync loop in the background
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
err := m.SettlementSyncLoop(ctx)
if err != nil {
m.freezeNode(err)
}
return nil
return m.SettlementSyncLoop(ctx)
})

// Monitor sequencer set updates
Expand Down
1 change: 0 additions & 1 deletion block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,6 @@ func TestApplyLocalBlock_WithFraudCheck(t *testing.T) {
mockExecutor.On("UpdateStateAfterInitChain", mock.Anything, mock.Anything).Return(nil)
mockExecutor.On("UpdateMempoolAfterInitChain", mock.Anything).Return(nil)
mockExecutor.On("ExecuteBlock", mock.Anything, mock.Anything).Return(nil, gerrc.ErrFault)
mockExecutor.On("AddConsensusMsgs", mock.Anything).Return()

// Check that handle fault is called
mockFraudHandler := &blockmocks.MockFraudHandler{}
Expand Down
18 changes: 16 additions & 2 deletions block/modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package block

import (
"context"
"errors"
"fmt"

"github.com/dymensionxyz/dymint/p2p"
Expand Down Expand Up @@ -71,7 +72,7 @@ func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error {
return fmt.Errorf("checking should rotate: %w", err)
}
if shouldRotate {
m.rotate(ctx)
m.rotate(ctx) // panics afterwards
}

// populate the bytes produced channel
Expand All @@ -87,7 +88,20 @@ func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error {
})

// Monitor and handling of the rotation
go m.MonitorProposerRotation(ctx)
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.MonitorProposerRotation(ctx)
})

go func() {
err = eg.Wait()
// Check if loops exited due to sequencer rotation signal
if errors.Is(err, errRotationRequested) {
m.rotate(ctx)
} else if err != nil {
m.logger.Error("block manager exited with error", "error", err)
m.freezeNode(err)
}
}()

return nil
}
Expand Down
54 changes: 52 additions & 2 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int)
for {
select {
case <-ctx.Done():
return ctx.Err()
return nil
case <-ticker.C:
// Only produce if I'm the current rollapp proposer.
if !m.AmIProposerOnRollapp() {
Expand Down Expand Up @@ -119,13 +119,18 @@ func (m *Manager) ProduceApplyGossipBlock(ctx context.Context, opts ProduceBlock
}

func (m *Manager) produceApplyGossip(ctx context.Context, opts ProduceBlockOptions) (block *types.Block, commit *types.Commit, err error) {
newSequencerSet, err := m.SnapshotSequencerSet()
if err != nil {
return nil, nil, fmt.Errorf("snapshot sequencer set: %w", err)
}

// If I'm not the current rollapp proposer, I should not produce a blocks.
block, commit, err = m.produceBlock(opts)
if err != nil {
return nil, nil, fmt.Errorf("produce block: %w", err)
}

if err := m.applyBlock(block, commit, types.BlockMetaData{Source: types.Produced}); err != nil {
if err := m.applyBlock(block, commit, types.BlockMetaData{Source: types.Produced, SequencerSet: newSequencerSet}); err != nil {
return nil, nil, fmt.Errorf("apply block: %w: %w", err, ErrNonRecoverable)
}

Expand All @@ -136,6 +141,51 @@ func (m *Manager) produceApplyGossip(ctx context.Context, opts ProduceBlockOptio
return block, commit, nil
}

// SnapshotSequencerSet loads two versions of the sequencer set:
// - the one that was used for the last block (from the store)
// - and the most recent one (from the manager memory)
//
// It then calculates the diff between the two and creates consensus messages for the new sequencers,
// i.e., only for the diff between two sets. If there is any diff (i.e., the sequencer set is updated),
// the method returns the entire new set. The new set will be used for next block and will be stored
// in the state instead of the old set after the block production.
//
// The set from the state is dumped to memory on reboots. It helps to avoid sending unnecessary
// UspertSequencer consensus messages on reboots. This is not a 100% solution, because the sequencer set
// is not persisted in the store in full node mode. It's only used in the proposer mode. Therefore,
// on rotation from the full node to the proposer, the sequencer set is duplicated as consensus msgs.
// Though single-time duplication it's not a big deal.
func (m *Manager) SnapshotSequencerSet() (sequencersAfterUpdate types.Sequencers, err error) {
// the most recent sequencer set
sequencersAfterUpdate = m.Sequencers.GetAll()

// the sequencer set that was used for the last block
lastSequencers, err := m.Store.LoadLastBlockSequencerSet()
// it's okay if the last sequencer set is not found, it can happen on genesis or after
// rotation from the full node to the proposer
if err != nil && !errors.Is(err, gerrc.ErrNotFound) {
return nil, fmt.Errorf("load last block sequencer set: %w", err)
}

// diff between the two sequencer sets
newSequencers := types.SequencerListRightOuterJoin(lastSequencers, sequencersAfterUpdate)

if len(newSequencers) == 0 {
// nothing to upsert, nothing to persist
return nil, nil
}

// create consensus msgs for new sequencers
msgs, err := ConsensusMsgsOnSequencerSetUpdate(newSequencers)
if err != nil {
return nil, fmt.Errorf("consensus msgs on sequencers set update: %w", err)
}
m.Executor.AddConsensusMsgs(msgs...)

// return the entire new set if there is any update
return sequencersAfterUpdate, nil
}

func (m *Manager) produceBlock(opts ProduceBlockOptions) (*types.Block, *types.Commit, error) {
newHeight := m.State.NextHeight()
lastHeaderHash, lastCommit, err := m.GetPreviousBlockHashes(newHeight)
Expand Down
60 changes: 51 additions & 9 deletions block/production_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,17 +340,24 @@ func TestUpdateInitialSequencerSet(t *testing.T) {
require.Zero(manager.State.Height())
require.Zero(manager.LastSettlementHeight.Load())

// Simulate updating the sequencer set from SL.
// This will add new consensus msgs to the queue.
// Simulate updating the sequencer set from SL on start
err = manager.UpdateSequencerSetFromSL()
require.NoError(err)

// Produce block and validate that we produced blocks
// Produce block and validate results. We expect to have two consensus messages for the two sequencers
// since the store for the last block sequencer set is empty.
block, _, err := manager.ProduceApplyGossipBlock(ctx, block2.ProduceBlockOptions{AllowEmpty: true})
require.NoError(err)
assert.Greater(t, manager.State.Height(), uint64(0))
assert.Zero(t, manager.LastSettlementHeight.Load())

// Validate the last block sequencer set is persisted in the store
actualSeqSet, err := manager.Store.LoadLastBlockSequencerSet()
require.NoError(err)
require.Len(actualSeqSet, 2)
require.Equal(actualSeqSet[0], proposer)
require.Equal(actualSeqSet[1], sequencer)

// Validate that the block has expected consensus msgs
require.Len(block.Data.ConsensusMessages, 2)

Expand Down Expand Up @@ -395,6 +402,23 @@ func TestUpdateInitialSequencerSet(t *testing.T) {
// Verify the result
require.True(proto.Equal(anyMsg1, block.Data.ConsensusMessages[0]))
require.True(proto.Equal(anyMsg2, block.Data.ConsensusMessages[1]))

// Produce one more block and validate results. We expect to have zero consensus messages
// since there are no sequencer set updates.
block, _, err = manager.ProduceApplyGossipBlock(ctx, block2.ProduceBlockOptions{AllowEmpty: true})
require.NoError(err)
assert.Greater(t, manager.State.Height(), uint64(1))
assert.Zero(t, manager.LastSettlementHeight.Load())

// Validate the last block sequencer set is persisted in the store
actualSeqSet, err = manager.Store.LoadLastBlockSequencerSet()
require.NoError(err)
require.Len(actualSeqSet, 2)
require.Equal(actualSeqSet[0], proposer)
require.Equal(actualSeqSet[1], sequencer)

// Validate that the block has expected consensus msgs
require.Len(block.Data.ConsensusMessages, 0)
}

func TestUpdateExistingSequencerSet(t *testing.T) {
Expand Down Expand Up @@ -444,23 +468,34 @@ func TestUpdateExistingSequencerSet(t *testing.T) {

// Set the initial sequencer set
manager.Sequencers.Set([]types.Sequencer{proposer, sequencer})
_, err = manager.Store.SaveLastBlockSequencerSet([]types.Sequencer{proposer, sequencer}, nil)
require.NoError(err)

// Check initial assertions
require.Zero(manager.State.Height())
require.Zero(manager.LastSettlementHeight.Load())
initialSequencers := manager.Sequencers.GetAll()
require.Len(initialSequencers, 2)
require.Equal(initialSequencers[0], proposer)
require.Equal(initialSequencers[1], sequencer)
// Memory has the initial sequencer set
initialMemorySequencers := manager.Sequencers.GetAll()
require.Len(initialMemorySequencers, 2)
require.Equal(initialMemorySequencers[0], proposer)
require.Equal(initialMemorySequencers[1], sequencer)
// Store has the initial sequencer set
initialStoreSequencers, err := manager.Store.LoadLastBlockSequencerSet()
require.NoError(err)
require.Len(initialStoreSequencers, 2)
require.Equal(initialStoreSequencers[0], proposer)
require.Equal(initialStoreSequencers[1], sequencer)

// Update one of the sequencers and pass the update to the manager.
// We expect that the manager will update the sequencer set in the state and generate a new consensus msg.
// We expect that the manager will update the sequencer set in memory and
// generate a new consensus msg during block production.
updatedSequencer := sequencer
const newSequencerRewardAddr = "dym1mk7pw34ypusacm29m92zshgxee3yreums8avur"
updatedSequencer.RewardAddr = newSequencerRewardAddr
// GetAllSequencers now return an updated sequencer
slmock.On("GetAllSequencers").Return([]types.Sequencer{proposer, updatedSequencer}, nil)

// Simulate updating the sequencer set from SL
err = manager.UpdateSequencerSetFromSL()
require.NoError(err)

Expand All @@ -472,7 +507,7 @@ func TestUpdateExistingSequencerSet(t *testing.T) {
require.NotEqual(sequencer, sequencers[1])
require.Equal(updatedSequencer, sequencers[1])

// Produce block and validate that we produced blocks
// Produce block and validate results. We expect to have one consensus message for the updated sequencer.
block, _, err := manager.ProduceApplyGossipBlock(ctx, block2.ProduceBlockOptions{AllowEmpty: true})
require.NoError(err)
assert.Greater(t, manager.State.Height(), uint64(0))
Expand Down Expand Up @@ -502,4 +537,11 @@ func TestUpdateExistingSequencerSet(t *testing.T) {

// Verify the result
require.True(proto.Equal(anyMsg1, block.Data.ConsensusMessages[0]))

// Validate the last block sequencer set is persisted in the store
actualStoreSequencers, err := manager.Store.LoadLastBlockSequencerSet()
require.NoError(err)
require.Len(actualStoreSequencers, 2)
require.Equal(actualStoreSequencers[0], proposer)
require.Equal(actualStoreSequencers[1], updatedSequencer)
}
2 changes: 1 addition & 1 deletion block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (m *Manager) PruningLoop(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
return nil
case retainHeight := <-m.pruningC:
var pruningHeight uint64
if m.RunMode == RunModeProposer { // do not delete anything that we might submit in future
Expand Down
48 changes: 16 additions & 32 deletions block/sequencers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,36 @@ import (
"context"
"fmt"
"time"

"github.com/dymensionxyz/dymint/types"
)

const (
ProposerMonitorInterval = 3 * time.Minute
)

func (m *Manager) MonitorProposerRotation(ctx context.Context) {
var errRotationRequested = fmt.Errorf("sequencer rotation started. signal to stop production")

func (m *Manager) MonitorProposerRotation(ctx context.Context) error {
ticker := time.NewTicker(ProposerMonitorInterval) // TODO: make this configurable
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
return nil
case <-ticker.C:
next, err := m.SLClient.GetNextProposer()
nextProposer, err := m.SLClient.GetNextProposer()
if err != nil {
m.logger.Error("Check rotation in progress", "err", err)
continue
}
if next != nil {
m.rotate(ctx)
// no rotation in progress
if nextProposer == nil {
continue
}

// we get here once a sequencer rotation signal is received
m.logger.Info("Sequencer rotation started.", "nextSeqAddr", nextProposer.SettlementAddress)
return errRotationRequested
}
}
}
Expand Down Expand Up @@ -103,7 +108,7 @@ func (m *Manager) ShouldRotate() (bool, error) {
func (m *Manager) rotate(ctx context.Context) {
// Get Next Proposer from SL. We assume such exists (even if empty proposer) otherwise function wouldn't be called.
nextProposer, err := m.SLClient.GetNextProposer()
if err != nil {
if err != nil || nextProposer == nil {
panic(fmt.Sprintf("rotate: fetch next proposer set from Hub: %v", err))
}

Expand Down Expand Up @@ -157,40 +162,19 @@ func (m *Manager) CreateAndPostLastBatch(ctx context.Context, nextSeqHash [32]by
return nil
}

// UpdateSequencerSetFromSL updates the sequencer set from the SL.
// UpdateSequencerSetFromSL updates the sequencer set from the SL. The sequencer set is saved only in memory.
// It will be persisted to the store when the block is produced (only in the proposer mode).
// Proposer is not changed here.
func (m *Manager) UpdateSequencerSetFromSL() error {
seqs, err := m.SLClient.GetAllSequencers()
if err != nil {
return fmt.Errorf("get all sequencers from the hub: %w", err)
}
err = m.HandleSequencerSetUpdate(seqs)
if err != nil {
return fmt.Errorf("handle sequencer set update: %w", err)
}
m.Sequencers.Set(seqs)
m.logger.Debug("Updated bonded sequencer set.", "newSet", m.Sequencers.String())
return nil
}

// HandleSequencerSetUpdate calculates the diff between hub's and current sequencer sets and
// creates consensus messages for all new sequencers. The method updates the current state
// and is not thread-safe. Returns errors on serialization issues.
func (m *Manager) HandleSequencerSetUpdate(newSet []types.Sequencer) error {
actualSequencers := m.Sequencers.GetAll()
// find new (updated) sequencers
newSequencers := types.SequencerListRightOuterJoin(actualSequencers, newSet)
// create consensus msgs for new sequencers
msgs, err := ConsensusMsgsOnSequencerSetUpdate(newSequencers)
if err != nil {
return fmt.Errorf("consensus msgs on sequencers set update: %w", err)
}
// add consensus msgs to the stream
m.Executor.AddConsensusMsgs(msgs...)
// save the new sequencer set to the state
m.Sequencers.Set(newSet)
return nil
}

// UpdateProposerFromSL queries the hub and updates the local dymint state proposer at the current height
func (m *Manager) UpdateProposerFromSL() error {
SLProposer, err := m.SLClient.GetProposerAtHeight(int64(m.State.NextHeight()))
Expand Down
Loading

0 comments on commit bbe0909

Please sign in to comment.