From 0ceb6692be7bf04a5c338e95feeb2d6c6c36b0b4 Mon Sep 17 00:00:00 2001 From: Michael Tsitrin Date: Tue, 19 Nov 2024 16:51:20 +0200 Subject: [PATCH 1/4] fixed race condition for last block --- block/manager.go | 6 +----- block/modes.go | 18 ++++++++++++++++-- block/sequencers.go | 21 +++++++++++++++------ block/sync.go | 4 ++-- 4 files changed, 34 insertions(+), 15 deletions(-) diff --git a/block/manager.go b/block/manager.go index f8d3a711e..dfe6a45b7 100644 --- a/block/manager.go +++ b/block/manager.go @@ -260,11 +260,7 @@ func (m *Manager) Start(ctx context.Context) error { // Start the settlement sync loop in the background uerrors.ErrGroupGoLog(eg, m.logger, func() error { - err := m.SettlementSyncLoop(ctx) - if err != nil { - m.freezeNode(err) - } - return nil + return m.SettlementSyncLoop(ctx) }) // Monitor sequencer set updates diff --git a/block/modes.go b/block/modes.go index b1ded0381..ca206c1cd 100644 --- a/block/modes.go +++ b/block/modes.go @@ -2,6 +2,7 @@ package block import ( "context" + "errors" "fmt" "github.com/dymensionxyz/dymint/p2p" @@ -69,7 +70,7 @@ func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error { return fmt.Errorf("checking should rotate: %w", err) } if shouldRotate { - m.rotate(ctx) + m.rotate(ctx) // panics afterwards } // populate the bytes produced channel @@ -84,8 +85,21 @@ func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error { return m.ProduceBlockLoop(ctx, bytesProducedC) }) + // channel to signal sequencer rotation started + rotateSequencerC := make(chan struct{}, 1) // Monitor and handling of the rotation - go m.MonitorProposerRotation(ctx) + uerrors.ErrGroupGoLog(eg, m.logger, func() error { + return m.MonitorProposerRotation(ctx, rotateSequencerC) + }) + + // check wether + go func() { + err = eg.Wait() + // Check if exited due to sequencer rotation signal + if errors.Is(err, errRotationRequested) { + m.rotate(ctx) + } + }() return nil } diff --git a/block/sequencers.go b/block/sequencers.go index 85513cfd1..a8be91f39 100644 --- a/block/sequencers.go +++ b/block/sequencers.go @@ -13,23 +13,32 @@ const ( ProposerMonitorInterval = 3 * time.Minute ) -func (m *Manager) MonitorProposerRotation(ctx context.Context) { +var ( + errRotationRequested = fmt.Errorf("sequencer rotation started. signal to stop production") +) + +func (m *Manager) MonitorProposerRotation(ctx context.Context, rotateC chan struct{}) error { ticker := time.NewTicker(ProposerMonitorInterval) // TODO: make this configurable defer ticker.Stop() for { select { case <-ctx.Done(): - return + return nil case <-ticker.C: - next, err := m.SLClient.GetNextProposer() + nextProposer, err := m.SLClient.GetNextProposer() if err != nil { m.logger.Error("Check rotation in progress", "err", err) continue } - if next != nil { - m.rotate(ctx) + // no rotation in progress + if nextProposer == nil { + continue } + + // we get here once a sequencer rotation signal is received + m.logger.Info("Sequencer rotation started.", "nextSeqAddr", nextProposer.SettlementAddress) + return errRotationRequested } } } @@ -103,7 +112,7 @@ func (m *Manager) ShouldRotate() (bool, error) { func (m *Manager) rotate(ctx context.Context) { // Get Next Proposer from SL. We assume such exists (even if empty proposer) otherwise function wouldn't be called. nextProposer, err := m.SLClient.GetNextProposer() - if err != nil { + if err != nil || nextProposer == nil { panic(fmt.Sprintf("rotate: fetch next proposer set from Hub: %v", err)) } diff --git a/block/sync.go b/block/sync.go index 1f7b6a53e..ff104021a 100644 --- a/block/sync.go +++ b/block/sync.go @@ -49,14 +49,14 @@ func (m *Manager) SettlementSyncLoop(ctx context.Context) error { for { select { case <-ctx.Done(): - return ctx.Err() + return nil case <-m.settlementSyncingC: m.logger.Info("syncing to target height", "targetHeight", m.LastSettlementHeight.Load()) for currH := m.State.NextHeight(); currH <= m.LastSettlementHeight.Load(); currH = m.State.NextHeight() { // if context has been cancelled, stop syncing if ctx.Err() != nil { - return ctx.Err() + return nil } // if we have the block locally, we don't need to fetch it from the DA. // it will only happen in case of rollback. From 9c9ecddd1d1720ab8fd49f91d22c500655d3fb3f Mon Sep 17 00:00:00 2001 From: Michael Tsitrin Date: Tue, 19 Nov 2024 21:02:40 +0200 Subject: [PATCH 2/4] linter --- block/modes.go | 3 +-- block/sequencers.go | 4 +--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/block/modes.go b/block/modes.go index ca206c1cd..2e38ab9f2 100644 --- a/block/modes.go +++ b/block/modes.go @@ -92,10 +92,9 @@ func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error { return m.MonitorProposerRotation(ctx, rotateSequencerC) }) - // check wether go func() { err = eg.Wait() - // Check if exited due to sequencer rotation signal + // Check if loops exited due to sequencer rotation signal if errors.Is(err, errRotationRequested) { m.rotate(ctx) } diff --git a/block/sequencers.go b/block/sequencers.go index a8be91f39..163b26f80 100644 --- a/block/sequencers.go +++ b/block/sequencers.go @@ -13,9 +13,7 @@ const ( ProposerMonitorInterval = 3 * time.Minute ) -var ( - errRotationRequested = fmt.Errorf("sequencer rotation started. signal to stop production") -) +var errRotationRequested = fmt.Errorf("sequencer rotation started. signal to stop production") func (m *Manager) MonitorProposerRotation(ctx context.Context, rotateC chan struct{}) error { ticker := time.NewTicker(ProposerMonitorInterval) // TODO: make this configurable From aaa0bb5f48a6cd9e94fd3dbfbefd72e515772fca Mon Sep 17 00:00:00 2001 From: Michael Tsitrin Date: Tue, 19 Nov 2024 21:03:58 +0200 Subject: [PATCH 3/4] cleanup --- block/modes.go | 4 +--- block/sequencers.go | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/block/modes.go b/block/modes.go index 2e38ab9f2..8910b8636 100644 --- a/block/modes.go +++ b/block/modes.go @@ -85,11 +85,9 @@ func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error { return m.ProduceBlockLoop(ctx, bytesProducedC) }) - // channel to signal sequencer rotation started - rotateSequencerC := make(chan struct{}, 1) // Monitor and handling of the rotation uerrors.ErrGroupGoLog(eg, m.logger, func() error { - return m.MonitorProposerRotation(ctx, rotateSequencerC) + return m.MonitorProposerRotation(ctx) }) go func() { diff --git a/block/sequencers.go b/block/sequencers.go index 163b26f80..c6e078b7d 100644 --- a/block/sequencers.go +++ b/block/sequencers.go @@ -15,7 +15,7 @@ const ( var errRotationRequested = fmt.Errorf("sequencer rotation started. signal to stop production") -func (m *Manager) MonitorProposerRotation(ctx context.Context, rotateC chan struct{}) error { +func (m *Manager) MonitorProposerRotation(ctx context.Context) error { ticker := time.NewTicker(ProposerMonitorInterval) // TODO: make this configurable defer ticker.Stop() From eae6ca6e5989fb9df43f7510890529ad8036302d Mon Sep 17 00:00:00 2001 From: Michael Tsitrin Date: Wed, 20 Nov 2024 13:22:17 +0200 Subject: [PATCH 4/4] cleanup --- block/fork.go | 2 +- block/modes.go | 3 +++ block/produce.go | 2 +- block/pruning.go | 2 +- block/submit.go | 6 +++--- block/validate.go | 2 +- 6 files changed, 10 insertions(+), 7 deletions(-) diff --git a/block/fork.go b/block/fork.go index a7a86e86c..ba38ba875 100644 --- a/block/fork.go +++ b/block/fork.go @@ -37,7 +37,7 @@ func (m *Manager) MonitorForkUpdateLoop(ctx context.Context) error { } select { case <-ctx.Done(): - return ctx.Err() + return nil case <-ticker.C: } } diff --git a/block/modes.go b/block/modes.go index 8910b8636..3ddb94433 100644 --- a/block/modes.go +++ b/block/modes.go @@ -95,6 +95,9 @@ func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error { // Check if loops exited due to sequencer rotation signal if errors.Is(err, errRotationRequested) { m.rotate(ctx) + } else if err != nil { + m.logger.Error("block manager exited with error", "error", err) + m.freezeNode(err) } }() diff --git a/block/produce.go b/block/produce.go index ef723286e..8365ef1d3 100644 --- a/block/produce.go +++ b/block/produce.go @@ -38,7 +38,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int) for { select { case <-ctx.Done(): - return ctx.Err() + return nil case <-ticker.C: // Only produce if I'm the current rollapp proposer. if !m.AmIProposerOnRollapp() { diff --git a/block/pruning.go b/block/pruning.go index 401492c25..30f44802b 100644 --- a/block/pruning.go +++ b/block/pruning.go @@ -32,7 +32,7 @@ func (m *Manager) PruningLoop(ctx context.Context) error { for { select { case <-ctx.Done(): - return ctx.Err() + return nil case retainHeight := <-m.pruningC: var pruningHeight uint64 if m.RunMode == RunModeProposer { // do not delete anything that we might submit in future diff --git a/block/submit.go b/block/submit.go index 9facdeef4..0440b0cd3 100644 --- a/block/submit.go +++ b/block/submit.go @@ -65,13 +65,13 @@ func SubmitLoopInner( // we block here until we get a progress nudge from the submitter thread select { case <-ctx.Done(): - return ctx.Err() + return nil case <-trigger.C: } } else { select { case <-ctx.Done(): - return ctx.Err() + return nil case n := <-bytesProduced: pendingBytes.Add(uint64(n)) logger.Debug("Added bytes produced to bytes pending submission counter.", "bytes added", n, "pending", pendingBytes.Load()) @@ -91,7 +91,7 @@ func SubmitLoopInner( for { select { case <-ctx.Done(): - return ctx.Err() + return nil case <-ticker.C: case <-submitter.C: } diff --git a/block/validate.go b/block/validate.go index 281f49f44..825274747 100644 --- a/block/validate.go +++ b/block/validate.go @@ -27,7 +27,7 @@ func (m *Manager) SettlementValidateLoop(ctx context.Context) error { for { select { case <-ctx.Done(): - return ctx.Err() + return nil case <-m.settlementValidationC: targetValidationHeight := min(m.LastSettlementHeight.Load(), m.State.Height()) m.logger.Info("validating state updates to target height", "targetHeight", targetValidationHeight)