diff --git a/block/fork.go b/block/fork.go index 26bcab454..b06cb549b 100644 --- a/block/fork.go +++ b/block/fork.go @@ -36,7 +36,7 @@ func (m *Manager) MonitorForkUpdateLoop(ctx context.Context) error { } select { case <-ctx.Done(): - return ctx.Err() + return nil case <-ticker.C: } } diff --git a/block/manager.go b/block/manager.go index 71794ce62..7e66ba870 100644 --- a/block/manager.go +++ b/block/manager.go @@ -266,11 +266,7 @@ func (m *Manager) Start(ctx context.Context) error { // Start the settlement sync loop in the background uerrors.ErrGroupGoLog(eg, m.logger, func() error { - err := m.SettlementSyncLoop(ctx) - if err != nil { - m.freezeNode(err) - } - return nil + return m.SettlementSyncLoop(ctx) }) // Monitor sequencer set updates diff --git a/block/modes.go b/block/modes.go index b1ded0381..3ddb94433 100644 --- a/block/modes.go +++ b/block/modes.go @@ -2,6 +2,7 @@ package block import ( "context" + "errors" "fmt" "github.com/dymensionxyz/dymint/p2p" @@ -69,7 +70,7 @@ func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error { return fmt.Errorf("checking should rotate: %w", err) } if shouldRotate { - m.rotate(ctx) + m.rotate(ctx) // panics afterwards } // populate the bytes produced channel @@ -85,7 +86,20 @@ func (m *Manager) runAsProposer(ctx context.Context, eg *errgroup.Group) error { }) // Monitor and handling of the rotation - go m.MonitorProposerRotation(ctx) + uerrors.ErrGroupGoLog(eg, m.logger, func() error { + return m.MonitorProposerRotation(ctx) + }) + + go func() { + err = eg.Wait() + // Check if loops exited due to sequencer rotation signal + if errors.Is(err, errRotationRequested) { + m.rotate(ctx) + } else if err != nil { + m.logger.Error("block manager exited with error", "error", err) + m.freezeNode(err) + } + }() return nil } diff --git a/block/produce.go b/block/produce.go index c6a2e5352..d41bd41a0 100644 --- a/block/produce.go +++ b/block/produce.go @@ -38,7 +38,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int) for { select { case <-ctx.Done(): - return ctx.Err() + return nil case <-ticker.C: // Only produce if I'm the current rollapp proposer. if !m.AmIProposerOnRollapp() { diff --git a/block/pruning.go b/block/pruning.go index 401492c25..30f44802b 100644 --- a/block/pruning.go +++ b/block/pruning.go @@ -32,7 +32,7 @@ func (m *Manager) PruningLoop(ctx context.Context) error { for { select { case <-ctx.Done(): - return ctx.Err() + return nil case retainHeight := <-m.pruningC: var pruningHeight uint64 if m.RunMode == RunModeProposer { // do not delete anything that we might submit in future diff --git a/block/sequencers.go b/block/sequencers.go index ca3dff074..a94cd8930 100644 --- a/block/sequencers.go +++ b/block/sequencers.go @@ -13,23 +13,30 @@ const ( ProposerMonitorInterval = 3 * time.Minute ) -func (m *Manager) MonitorProposerRotation(ctx context.Context) { +var errRotationRequested = fmt.Errorf("sequencer rotation started. signal to stop production") + +func (m *Manager) MonitorProposerRotation(ctx context.Context) error { ticker := time.NewTicker(ProposerMonitorInterval) // TODO: make this configurable defer ticker.Stop() for { select { case <-ctx.Done(): - return + return nil case <-ticker.C: - next, err := m.SLClient.GetNextProposer() + nextProposer, err := m.SLClient.GetNextProposer() if err != nil { m.logger.Error("Check rotation in progress", "err", err) continue } - if next != nil { - m.rotate(ctx) + // no rotation in progress + if nextProposer == nil { + continue } + + // we get here once a sequencer rotation signal is received + m.logger.Info("Sequencer rotation started.", "nextSeqAddr", nextProposer.SettlementAddress) + return errRotationRequested } } } @@ -103,7 +110,7 @@ func (m *Manager) ShouldRotate() (bool, error) { func (m *Manager) rotate(ctx context.Context) { // Get Next Proposer from SL. We assume such exists (even if empty proposer) otherwise function wouldn't be called. nextProposer, err := m.SLClient.GetNextProposer() - if err != nil { + if err != nil || nextProposer == nil { panic(fmt.Sprintf("rotate: fetch next proposer set from Hub: %v", err)) } diff --git a/block/submit.go b/block/submit.go index 2105578d6..46c70b101 100644 --- a/block/submit.go +++ b/block/submit.go @@ -65,7 +65,7 @@ func SubmitLoopInner( for { select { case <-ctx.Done(): - return ctx.Err() + return nil case n := <-bytesProduced: pendingBytes.Add(uint64(n)) logger.Debug("Added bytes produced to bytes pending submission counter.", "bytes added", n, "pending", pendingBytes.Load()) @@ -78,7 +78,7 @@ func SubmitLoopInner( // we block here until we get a progress nudge from the submitter thread select { case <-ctx.Done(): - return ctx.Err() + return nil case <-trigger.C: } } @@ -92,7 +92,7 @@ func SubmitLoopInner( for { select { case <-ctx.Done(): - return ctx.Err() + return nil case <-ticker.C: case <-submitter.C: } diff --git a/block/sync.go b/block/sync.go index 319411ba5..96923ab88 100644 --- a/block/sync.go +++ b/block/sync.go @@ -49,14 +49,14 @@ func (m *Manager) SettlementSyncLoop(ctx context.Context) error { for { select { case <-ctx.Done(): - return ctx.Err() + return nil case <-m.settlementSyncingC: m.logger.Info("syncing to target height", "targetHeight", m.LastSettlementHeight.Load()) for currH := m.State.NextHeight(); currH <= m.LastSettlementHeight.Load(); currH = m.State.NextHeight() { // if context has been cancelled, stop syncing if ctx.Err() != nil { - return ctx.Err() + return nil } // if we have the block locally, we don't need to fetch it from the DA. // it will only happen in case of rollback. diff --git a/block/validate.go b/block/validate.go index 939dd7ee1..d2a86d07f 100644 --- a/block/validate.go +++ b/block/validate.go @@ -27,7 +27,7 @@ func (m *Manager) SettlementValidateLoop(ctx context.Context) error { for { select { case <-ctx.Done(): - return ctx.Err() + return nil case <-m.settlementValidationC: targetValidationHeight := min(m.LastSettlementHeight.Load(), m.State.Height()) m.logger.Info("validating state updates to target height", "targetHeight", targetValidationHeight)