From 04b13a0b78feda56a2ac16f8fdc49f1e0d5f1560 Mon Sep 17 00:00:00 2001 From: Daniel T <30197399+danwt@users.noreply.github.com> Date: Thu, 16 May 2024 19:55:11 +0100 Subject: [PATCH] fix(manager): removes outdated syncTarget field (#854) --- block/block.go | 8 ++-- block/gossip.go | 6 +-- block/manager.go | 107 ++++++++++++++++++++++++------------------ block/manager_test.go | 12 ++--- block/pruning.go | 16 +++---- block/retriever.go | 17 ++++--- block/submit.go | 5 +- block/submit_test.go | 24 +++++----- block/synctarget.go | 23 +++++---- store/pruning.go | 6 ++- store/pruning_test.go | 5 ++ store/storeIface.go | 1 - 12 files changed, 126 insertions(+), 104 deletions(-) diff --git a/block/block.go b/block/block.go index 2785d7719..286f0ec0a 100644 --- a/block/block.go +++ b/block/block.go @@ -90,8 +90,8 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta } // Prune old heights, if requested by ABCI app. - if retainHeight > 0 { - _, err := m.pruneBlocks(uint64(retainHeight)) + if 0 < retainHeight { + err = m.pruneBlocks(uint64(retainHeight)) if err != nil { m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err) } @@ -114,8 +114,8 @@ func (m *Manager) isHeightAlreadyApplied(blockHeight uint64) (bool, error) { } func (m *Manager) attemptApplyCachedBlocks() error { - m.retrieverMutex.Lock() - defer m.retrieverMutex.Unlock() + m.retrieverMu.Lock() + defer m.retrieverMu.Unlock() for { expectedHeight := m.State.NextHeight() diff --git a/block/gossip.go b/block/gossip.go index cd8450e1a..021894362 100644 --- a/block/gossip.go +++ b/block/gossip.go @@ -14,11 +14,11 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) { eventData, _ := event.Data().(p2p.GossipedBlock) block := eventData.Block commit := eventData.Commit - m.retrieverMutex.Lock() // needed to protect blockCache access + m.retrieverMu.Lock() // needed to protect blockCache access _, found := m.blockCache[block.Header.Height] // It is not strictly necessary to return early, for correctness, but doing so helps us avoid mutex pressure and unnecessary repeated attempts to apply cached blocks if found { - m.retrieverMutex.Unlock() + m.retrieverMu.Unlock() return } @@ -31,7 +31,7 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) { Commit: &commit, } } - m.retrieverMutex.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant + m.retrieverMu.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant err := m.attemptApplyCachedBlocks() if err != nil { diff --git a/block/manager.go b/block/manager.go index 55dae12ae..ea67eea4b 100644 --- a/block/manager.go +++ b/block/manager.go @@ -31,6 +31,8 @@ import ( // Manager is responsible for aggregating transactions into blocks. type Manager struct { + logger types.Logger + // Configuration Conf config.BlockManagerConfig Genesis *tmtypes.GenesisDoc @@ -47,25 +49,30 @@ type Manager struct { DAClient da.DataAvailabilityLayerClient SLClient settlement.LayerI - // Data retrieval - Retriever da.BatchRetriever - SyncTargetDiode diodes.Diode - SyncTarget atomic.Uint64 - - // Block production - producedSizeCh chan uint64 // channel for the producer to report the size of the block it produced + /* + Production + */ + producedSizeCh chan uint64 // for the producer to report the size of the block it produced - // Submitter + /* + Submission + */ AccumulatedBatchSize atomic.Uint64 + // The last height which was submitted to both sublayers, that we know of. When we produce new batches, we will + // start at this height + 1. Note: only accessed by one thread at a time so doesn't need synchro. + // It is ALSO used by the producer, because the producer needs to check if it can prune blocks and it wont' + // prune anything that might be submitted in the future. Therefore, it must be atomic. + LastSubmittedHeight atomic.Uint64 /* - Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks, - and incoming DA blocks, respectively. + Retrieval */ - retrieverMutex sync.Mutex - - logger types.Logger - + // Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks, + // and incoming DA blocks, respectively. + retrieverMu sync.Mutex + Retriever da.BatchRetriever + // get the next target height to sync local state to + targetSyncHeight diodes.Diode // Cached blocks and commits for applying at future heights. The blocks may not be valid, because // we can only do full validation in sequential order. blockCache map[uint64]CachedBlock @@ -101,21 +108,21 @@ func NewManager( } agg := &Manager{ - Pubsub: pubsub, - p2pClient: p2pClient, - ProposerKey: proposerKey, - Conf: conf, - Genesis: genesis, - State: s, - Store: store, - Executor: exec, - DAClient: dalc, - SLClient: settlementClient, - Retriever: dalc.(da.BatchRetriever), - SyncTargetDiode: diodes.NewOneToOne(1, nil), - producedSizeCh: make(chan uint64), - logger: logger, - blockCache: make(map[uint64]CachedBlock), + Pubsub: pubsub, + p2pClient: p2pClient, + ProposerKey: proposerKey, + Conf: conf, + Genesis: genesis, + State: s, + Store: store, + Executor: exec, + DAClient: dalc, + SLClient: settlementClient, + Retriever: dalc.(da.BatchRetriever), + targetSyncHeight: diodes.NewOneToOne(1, nil), + producedSizeCh: make(chan uint64), + logger: logger, + blockCache: make(map[uint64]CachedBlock), } return agg, nil @@ -125,13 +132,11 @@ func NewManager( func (m *Manager) Start(ctx context.Context) error { m.logger.Info("Starting the block manager") - slProposerKey := m.SLClient.GetProposer().PublicKey.Bytes() - localProposerKey, err := m.ProposerKey.GetPublic().Raw() + isSequencer, err := m.IsSequencerVerify() if err != nil { - return fmt.Errorf("get local node public key: %w", err) + return err } - isSequencer := bytes.Equal(slProposerKey, localProposerKey) m.logger.Info("Starting block manager", "isSequencer", isSequencer) // Check if InitChain flow is needed @@ -159,39 +164,49 @@ func (m *Manager) Start(ctx context.Context) error { go m.SubmitLoop(ctx) } else { go m.RetrieveLoop(ctx) - go m.SyncTargetLoop(ctx) + go m.SyncToTargetHeightLoop(ctx) } return nil } +func (m *Manager) IsSequencerVerify() (bool, error) { + slProposerKey := m.SLClient.GetProposer().PublicKey.Bytes() + localProposerKey, err := m.ProposerKey.GetPublic().Raw() + if err != nil { + return false, fmt.Errorf("get local node public key: %w", err) + } + return bytes.Equal(slProposerKey, localProposerKey), nil +} + +func (m *Manager) IsSequencer() bool { + ret, _ := m.IsSequencerVerify() + return ret +} + +func (m *Manager) NextHeightToSubmit() uint64 { + return m.LastSubmittedHeight.Load() + 1 +} + // syncBlockManager enforces the node to be synced on initial run. func (m *Manager) syncBlockManager() error { res, err := m.SLClient.RetrieveBatch() if errors.Is(err, gerr.ErrNotFound) { // The SL hasn't got any batches for this chain yet. m.logger.Info("No batches for chain found in SL.") - m.SyncTarget.Store(uint64(m.Genesis.InitialHeight - 1)) + m.LastSubmittedHeight.Store(uint64(m.Genesis.InitialHeight - 1)) return nil } if err != nil { // TODO: separate between fresh rollapp and non-registered rollapp return err } - // Set the syncTarget according to the result - m.SyncTarget.Store(res.EndHeight) - err = m.syncUntilTarget(res.EndHeight) + m.LastSubmittedHeight.Store(res.EndHeight) + err = m.syncToTargetHeight(res.EndHeight) if err != nil { return err } - m.logger.Info("Synced.", "current height", m.State.Height(), "syncTarget", m.SyncTarget.Load()) + m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSubmittedHeight.Load()) return nil } - -// UpdateSyncParams updates the sync target and state index if necessary -func (m *Manager) UpdateSyncParams(endHeight uint64) { - types.RollappHubHeightGauge.Set(float64(endHeight)) - m.logger.Info("SyncTarget updated", "syncTarget", endHeight) - m.SyncTarget.Store(endHeight) -} diff --git a/block/manager_test.go b/block/manager_test.go index 4faef569a..457b1de05 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -118,9 +118,8 @@ func TestProduceOnlyAfterSynced(t *testing.T) { t.Log("Taking the manager out of sync by submitting a batch") - syncTarget := manager.SyncTarget.Load() numBatchesToAdd := 2 - nextBatchStartHeight := syncTarget + 1 + nextBatchStartHeight := manager.NextHeightToSubmit() var batch *types.Batch for i := 0; i < numBatchesToAdd; i++ { batch, err = testutil.GenerateBatch(nextBatchStartHeight, nextBatchStartHeight+uint64(testutil.DefaultTestBatchSize-1), manager.ProposerKey) @@ -135,7 +134,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) { } // Initially sync target is 0 - assert.Zero(t, manager.SyncTarget.Load()) + assert.Zero(t, manager.LastSubmittedHeight.Load()) assert.True(t, manager.State.Height() == 0) // enough time to sync and produce blocks @@ -149,7 +148,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) { assert.NoError(t, err) }() <-ctx.Done() - assert.Equal(t, batch.EndHeight, manager.SyncTarget.Load()) + assert.Equal(t, batch.EndHeight, manager.LastSubmittedHeight.Load()) // validate that we produced blocks assert.Greater(t, manager.State.Height(), batch.EndHeight) } @@ -353,7 +352,7 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) { } // Call createNextDABatch function - startHeight := manager.SyncTarget.Load() + 1 + startHeight := manager.NextHeightToSubmit() endHeight := startHeight + uint64(tc.blocksToProduce) - 1 batch, err := manager.CreateNextBatchToSubmit(startHeight, endHeight) assert.NoError(err) @@ -397,8 +396,7 @@ func TestDAFetch(t *testing.T) { app.On("Commit", mock.Anything).Return(abci.ResponseCommit{Data: commitHash[:]}) - syncTarget := manager.SyncTarget.Load() - nextBatchStartHeight := syncTarget + 1 + nextBatchStartHeight := manager.NextHeightToSubmit() batch, err := testutil.GenerateBatch(nextBatchStartHeight, nextBatchStartHeight+uint64(testutil.DefaultTestBatchSize-1), manager.ProposerKey) require.NoError(err) daResultSubmitBatch := manager.DAClient.SubmitBatch(batch) diff --git a/block/pruning.go b/block/pruning.go index 70977cfc9..4ce9ffd2d 100644 --- a/block/pruning.go +++ b/block/pruning.go @@ -2,18 +2,18 @@ package block import ( "fmt" -) -func (m *Manager) pruneBlocks(retainHeight uint64) (uint64, error) { - syncTarget := m.SyncTarget.Load() + "github.com/dymensionxyz/dymint/gerr" +) - if retainHeight > syncTarget { - return 0, fmt.Errorf("cannot prune uncommitted blocks") +func (m *Manager) pruneBlocks(retainHeight uint64) error { + if m.IsSequencer() && retainHeight <= m.NextHeightToSubmit() { // do not delete anything that we might submit in future + return fmt.Errorf("cannot prune blocks before they have been submitted: %d: %w", retainHeight, gerr.ErrInvalidArgument) } pruned, err := m.Store.PruneBlocks(m.State.BaseHeight, retainHeight) if err != nil { - return 0, fmt.Errorf("prune block store: %w", err) + return fmt.Errorf("prune block store: %w", err) } // TODO: prune state/indexer and state/txindexer?? @@ -21,9 +21,9 @@ func (m *Manager) pruneBlocks(retainHeight uint64) (uint64, error) { m.State.BaseHeight = retainHeight _, err = m.Store.SaveState(m.State, nil) if err != nil { - return 0, fmt.Errorf("save state: %w", err) + return fmt.Errorf("save state: %w", err) } m.logger.Info("pruned blocks", "pruned", pruned, "retain_height", retainHeight) - return pruned, nil + return nil } diff --git a/block/retriever.go b/block/retriever.go index 240d1a902..30bbb09df 100644 --- a/block/retriever.go +++ b/block/retriever.go @@ -15,17 +15,16 @@ import ( // fetching batches from the settlement layer and then fetching the actual blocks // from the DA. func (m *Manager) RetrieveLoop(ctx context.Context) { - m.logger.Info("started retrieve loop") - syncTargetPoller := diodes.NewPoller(m.SyncTargetDiode, diodes.WithPollingContext(ctx)) + m.logger.Info("Started retrieve loop.") + p := diodes.NewPoller(m.targetSyncHeight, diodes.WithPollingContext(ctx)) for { select { case <-ctx.Done(): return default: - // Get only the latest sync target - targetHeight := syncTargetPoller.Next() - err := m.syncUntilTarget(*(*uint64)(targetHeight)) + targetHeight := p.Next() // We only care about the latest one + err := m.syncToTargetHeight(*(*uint64)(targetHeight)) if err != nil { panic(fmt.Errorf("sync until target: %w", err)) } @@ -33,10 +32,10 @@ func (m *Manager) RetrieveLoop(ctx context.Context) { } } -// syncUntilTarget syncs blocks until the target height is reached. +// syncToTargetHeight syncs blocks until the target height is reached. // It fetches the batches from the settlement, gets the DA height and gets // the actual blocks from the DA. -func (m *Manager) syncUntilTarget(targetHeight uint64) error { +func (m *Manager) syncToTargetHeight(targetHeight uint64) error { for currH := m.State.Height(); currH < targetHeight; currH = m.State.Height() { // It's important that we query the state index before fetching the batch, rather @@ -101,8 +100,8 @@ func (m *Manager) ProcessNextDABatch(daMetaData *da.DASubmitMetaData) error { m.logger.Debug("retrieved batches", "n", len(batchResp.Batches), "daHeight", daMetaData.Height) - m.retrieverMutex.Lock() - defer m.retrieverMutex.Unlock() + m.retrieverMu.Lock() + defer m.retrieverMu.Unlock() for _, batch := range batchResp.Batches { for i, block := range batch.Blocks { diff --git a/block/submit.go b/block/submit.go index a1567addf..d798f48d9 100644 --- a/block/submit.go +++ b/block/submit.go @@ -111,7 +111,7 @@ func (m *Manager) AccumulatedDataLoop(ctx context.Context, toSubmit chan struct{ func (m *Manager) HandleSubmissionTrigger() error { // Load current sync target and height to determine if new blocks are available for submission. - startHeight := m.SyncTarget.Load() + 1 + startHeight := m.NextHeightToSubmit() endHeightInclusive := m.State.Height() if endHeightInclusive < startHeight { @@ -137,7 +137,8 @@ func (m *Manager) HandleSubmissionTrigger() error { } m.logger.Info("Submitted batch to SL.", "start height", resultSubmitToDA, "end height", nextBatch.EndHeight) - m.UpdateSyncParams(actualEndHeight) + types.RollappHubHeightGauge.Set(float64(actualEndHeight)) + m.LastSubmittedHeight.Store(actualEndHeight) return nil } diff --git a/block/submit_test.go b/block/submit_test.go index 31667df46..0e62cfcaf 100644 --- a/block/submit_test.go +++ b/block/submit_test.go @@ -39,17 +39,17 @@ func TestBatchSubmissionHappyFlow(t *testing.T) { // Check initial assertions initialHeight := uint64(0) require.Zero(manager.State.Height()) - require.Zero(manager.SyncTarget.Load()) + require.Zero(manager.LastSubmittedHeight.Load()) // Produce block and validate that we produced blocks _, _, err = manager.ProduceAndGossipBlock(ctx, true) require.NoError(err) assert.Greater(t, manager.State.Height(), initialHeight) - assert.Zero(t, manager.SyncTarget.Load()) + assert.Zero(t, manager.LastSubmittedHeight.Load()) // submit and validate sync target manager.HandleSubmissionTrigger() - assert.EqualValues(t, manager.State.Height(), manager.SyncTarget.Load()) + assert.EqualValues(t, manager.State.Height(), manager.LastSubmittedHeight.Load()) } func TestBatchSubmissionFailedSubmission(t *testing.T) { @@ -86,22 +86,22 @@ func TestBatchSubmissionFailedSubmission(t *testing.T) { // Check initial assertions initialHeight := uint64(0) require.Zero(manager.State.Height()) - require.Zero(manager.SyncTarget.Load()) + require.Zero(manager.LastSubmittedHeight.Load()) // Produce block and validate that we produced blocks _, _, err = manager.ProduceAndGossipBlock(ctx, true) require.NoError(err) assert.Greater(t, manager.State.Height(), initialHeight) - assert.Zero(t, manager.SyncTarget.Load()) + assert.Zero(t, manager.LastSubmittedHeight.Load()) // try to submit, we expect failure - mockLayerI.On("SubmitBatch", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("Failed to submit batch")).Once() + mockLayerI.On("SubmitBatch", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("submit batch")).Once() assert.Error(t, manager.HandleSubmissionTrigger()) // try to submit again, we expect success mockLayerI.On("SubmitBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() manager.HandleSubmissionTrigger() - assert.EqualValues(t, manager.State.Height(), manager.SyncTarget.Load()) + assert.EqualValues(t, manager.State.Height(), manager.LastSubmittedHeight.Load()) } // TestSubmissionByTime tests the submission trigger by time @@ -135,7 +135,7 @@ func TestSubmissionByTime(t *testing.T) { // Check initial height initialHeight := uint64(0) require.Equal(initialHeight, manager.State.Height()) - require.Zero(manager.SyncTarget.Load()) + require.Zero(manager.LastSubmittedHeight.Load()) var wg sync.WaitGroup mCtx, cancel := context.WithTimeout(context.Background(), 2*submitTimeout) @@ -154,7 +154,7 @@ func TestSubmissionByTime(t *testing.T) { }() wg.Wait() // Wait for all goroutines to finish - require.True(manager.SyncTarget.Load() > 0) + require.True(0 < manager.LastSubmittedHeight.Load()) } // TestSubmissionByBatchSize tests the submission trigger by batch size @@ -201,6 +201,7 @@ func TestSubmissionByBatchSize(t *testing.T) { }() go func() { + assert.Zero(manager.LastSubmittedHeight.Load()) manager.SubmitLoop(ctx) wg.Done() // Decrease counter when this goroutine finishes }() @@ -210,14 +211,13 @@ func TestSubmissionByBatchSize(t *testing.T) { // assert block produced but nothing submitted yet assert.Greater(manager.State.Height(), uint64(0)) assert.Greater(manager.AccumulatedBatchSize.Load(), uint64(0)) - assert.Zero(manager.SyncTarget.Load()) wg.Wait() // Wait for all goroutines to finish if c.expectedSubmission { - assert.Positive(manager.SyncTarget.Load()) + assert.Positive(manager.LastSubmittedHeight.Load()) } else { - assert.Zero(manager.SyncTarget.Load()) + assert.Zero(manager.LastSubmittedHeight.Load()) } } } diff --git a/block/synctarget.go b/block/synctarget.go index c101fddbf..5f3341662 100644 --- a/block/synctarget.go +++ b/block/synctarget.go @@ -3,14 +3,15 @@ package block import ( "context" + "github.com/dymensionxyz/dymint/types" + "code.cloudfoundry.org/go-diodes" "github.com/dymensionxyz/dymint/settlement" ) -// SyncTargetLoop is responsible for getting real time updates about settlement batch submissions. -// For non sequencer: updating the sync target which will be used by retrieveLoop to sync until this target. -// It publishes new sync height targets which will then be synced by another process. -func (m *Manager) SyncTargetLoop(ctx context.Context) { +// SyncToTargetHeightLoop gets real time updates about settlement batch submissions and sends the latest height downstream +// to be retrieved by another process which will pull the data. +func (m *Manager) SyncToTargetHeightLoop(ctx context.Context) { m.logger.Info("Started sync target loop") subscription, err := m.Pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted) if err != nil { @@ -24,19 +25,21 @@ func (m *Manager) SyncTargetLoop(ctx context.Context) { return case event := <-subscription.Out(): eventData, _ := event.Data().(*settlement.EventDataNewBatchAccepted) + h := eventData.EndHeight - if eventData.EndHeight <= m.State.Height() { + if h <= m.State.Height() { m.logger.Debug( "syncTargetLoop: received new settlement batch accepted with batch end height <= current store height, skipping.", - "height", - eventData.EndHeight, - "currentHeight", + "target sync height (batch end height)", + h, + "current store height", m.State.Height(), ) continue } - m.UpdateSyncParams(eventData.EndHeight) - m.SyncTargetDiode.Set(diodes.GenericDataType(&eventData.EndHeight)) + types.RollappHubHeightGauge.Set(float64(h)) + m.targetSyncHeight.Set(diodes.GenericDataType(&h)) + m.logger.Info("Set new target sync height", "height", h) case <-subscription.Cancelled(): m.logger.Error("syncTargetLoop subscription canceled") return diff --git a/store/pruning.go b/store/pruning.go index cebec016e..e15bab0cb 100644 --- a/store/pruning.go +++ b/store/pruning.go @@ -2,16 +2,18 @@ package store import ( "fmt" + + "github.com/dymensionxyz/dymint/gerr" ) // PruneBlocks removes block up to (but not including) a height. It returns number of blocks pruned. func (s *DefaultStore) PruneBlocks(from, to uint64) (uint64, error) { if from <= 0 { - return 0, fmt.Errorf("from height must be greater than 0") + return 0, fmt.Errorf("from height must be greater than 0: %w", gerr.ErrInvalidArgument) } if to <= from { - return 0, fmt.Errorf("to height (%d) must be greater than from height (%d)", to, from) + return 0, fmt.Errorf("to height must be greater than from height: to: %d: from: %d: %w", to, from, gerr.ErrInvalidArgument) } pruned := uint64(0) diff --git a/store/pruning_test.go b/store/pruning_test.go index 43dcd774e..43f9c0e7a 100644 --- a/store/pruning_test.go +++ b/store/pruning_test.go @@ -47,6 +47,11 @@ func TestStorePruning(t *testing.T) { testutil.GetRandomBlock(2, 0), testutil.GetRandomBlock(3, 0), }, 3, 3, true}, + {"to height exceeds actual block cnt", []*types.Block{ + testutil.GetRandomBlock(1, 0), + testutil.GetRandomBlock(2, 0), + testutil.GetRandomBlock(3, 0), + }, 2, 5, false}, // it shouldn't error it should just no-op } for _, c := range cases { t.Run(c.name, func(t *testing.T) { diff --git a/store/storeIface.go b/store/storeIface.go index 3acb988e6..2cd285d16 100644 --- a/store/storeIface.go +++ b/store/storeIface.go @@ -70,6 +70,5 @@ type Store interface { LoadValidators(height uint64) (*tmtypes.ValidatorSet, error) - // Pruning functions PruneBlocks(from, to uint64) (uint64, error) }