Skip to content

Commit

Permalink
fix(manager): removes outdated syncTarget field (#854)
Browse files Browse the repository at this point in the history
  • Loading branch information
danwt authored May 16, 2024
1 parent f7b9383 commit 04b13a0
Show file tree
Hide file tree
Showing 12 changed files with 126 additions and 104 deletions.
8 changes: 4 additions & 4 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
}

// Prune old heights, if requested by ABCI app.
if retainHeight > 0 {
_, err := m.pruneBlocks(uint64(retainHeight))
if 0 < retainHeight {
err = m.pruneBlocks(uint64(retainHeight))
if err != nil {
m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err)
}
Expand All @@ -114,8 +114,8 @@ func (m *Manager) isHeightAlreadyApplied(blockHeight uint64) (bool, error) {
}

func (m *Manager) attemptApplyCachedBlocks() error {
m.retrieverMutex.Lock()
defer m.retrieverMutex.Unlock()
m.retrieverMu.Lock()
defer m.retrieverMu.Unlock()

for {
expectedHeight := m.State.NextHeight()
Expand Down
6 changes: 3 additions & 3 deletions block/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
eventData, _ := event.Data().(p2p.GossipedBlock)
block := eventData.Block
commit := eventData.Commit
m.retrieverMutex.Lock() // needed to protect blockCache access
m.retrieverMu.Lock() // needed to protect blockCache access
_, found := m.blockCache[block.Header.Height]
// It is not strictly necessary to return early, for correctness, but doing so helps us avoid mutex pressure and unnecessary repeated attempts to apply cached blocks
if found {
m.retrieverMutex.Unlock()
m.retrieverMu.Unlock()
return
}

Expand All @@ -31,7 +31,7 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
Commit: &commit,
}
}
m.retrieverMutex.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant
m.retrieverMu.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant

err := m.attemptApplyCachedBlocks()
if err != nil {
Expand Down
107 changes: 61 additions & 46 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (

// Manager is responsible for aggregating transactions into blocks.
type Manager struct {
logger types.Logger

// Configuration
Conf config.BlockManagerConfig
Genesis *tmtypes.GenesisDoc
Expand All @@ -47,25 +49,30 @@ type Manager struct {
DAClient da.DataAvailabilityLayerClient
SLClient settlement.LayerI

// Data retrieval
Retriever da.BatchRetriever
SyncTargetDiode diodes.Diode
SyncTarget atomic.Uint64

// Block production
producedSizeCh chan uint64 // channel for the producer to report the size of the block it produced
/*
Production
*/
producedSizeCh chan uint64 // for the producer to report the size of the block it produced

// Submitter
/*
Submission
*/
AccumulatedBatchSize atomic.Uint64
// The last height which was submitted to both sublayers, that we know of. When we produce new batches, we will
// start at this height + 1. Note: only accessed by one thread at a time so doesn't need synchro.
// It is ALSO used by the producer, because the producer needs to check if it can prune blocks and it wont'
// prune anything that might be submitted in the future. Therefore, it must be atomic.
LastSubmittedHeight atomic.Uint64

/*
Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks,
and incoming DA blocks, respectively.
Retrieval
*/
retrieverMutex sync.Mutex

logger types.Logger

// Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks,
// and incoming DA blocks, respectively.
retrieverMu sync.Mutex
Retriever da.BatchRetriever
// get the next target height to sync local state to
targetSyncHeight diodes.Diode
// Cached blocks and commits for applying at future heights. The blocks may not be valid, because
// we can only do full validation in sequential order.
blockCache map[uint64]CachedBlock
Expand Down Expand Up @@ -101,21 +108,21 @@ func NewManager(
}

agg := &Manager{
Pubsub: pubsub,
p2pClient: p2pClient,
ProposerKey: proposerKey,
Conf: conf,
Genesis: genesis,
State: s,
Store: store,
Executor: exec,
DAClient: dalc,
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
SyncTargetDiode: diodes.NewOneToOne(1, nil),
producedSizeCh: make(chan uint64),
logger: logger,
blockCache: make(map[uint64]CachedBlock),
Pubsub: pubsub,
p2pClient: p2pClient,
ProposerKey: proposerKey,
Conf: conf,
Genesis: genesis,
State: s,
Store: store,
Executor: exec,
DAClient: dalc,
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
targetSyncHeight: diodes.NewOneToOne(1, nil),
producedSizeCh: make(chan uint64),
logger: logger,
blockCache: make(map[uint64]CachedBlock),
}

return agg, nil
Expand All @@ -125,13 +132,11 @@ func NewManager(
func (m *Manager) Start(ctx context.Context) error {
m.logger.Info("Starting the block manager")

slProposerKey := m.SLClient.GetProposer().PublicKey.Bytes()
localProposerKey, err := m.ProposerKey.GetPublic().Raw()
isSequencer, err := m.IsSequencerVerify()
if err != nil {
return fmt.Errorf("get local node public key: %w", err)
return err
}

isSequencer := bytes.Equal(slProposerKey, localProposerKey)
m.logger.Info("Starting block manager", "isSequencer", isSequencer)

// Check if InitChain flow is needed
Expand Down Expand Up @@ -159,39 +164,49 @@ func (m *Manager) Start(ctx context.Context) error {
go m.SubmitLoop(ctx)
} else {
go m.RetrieveLoop(ctx)
go m.SyncTargetLoop(ctx)
go m.SyncToTargetHeightLoop(ctx)
}

return nil
}

func (m *Manager) IsSequencerVerify() (bool, error) {
slProposerKey := m.SLClient.GetProposer().PublicKey.Bytes()
localProposerKey, err := m.ProposerKey.GetPublic().Raw()
if err != nil {
return false, fmt.Errorf("get local node public key: %w", err)
}
return bytes.Equal(slProposerKey, localProposerKey), nil
}

func (m *Manager) IsSequencer() bool {
ret, _ := m.IsSequencerVerify()
return ret
}

func (m *Manager) NextHeightToSubmit() uint64 {
return m.LastSubmittedHeight.Load() + 1
}

// syncBlockManager enforces the node to be synced on initial run.
func (m *Manager) syncBlockManager() error {
res, err := m.SLClient.RetrieveBatch()
if errors.Is(err, gerr.ErrNotFound) {
// The SL hasn't got any batches for this chain yet.
m.logger.Info("No batches for chain found in SL.")
m.SyncTarget.Store(uint64(m.Genesis.InitialHeight - 1))
m.LastSubmittedHeight.Store(uint64(m.Genesis.InitialHeight - 1))
return nil
}
if err != nil {
// TODO: separate between fresh rollapp and non-registered rollapp
return err
}
// Set the syncTarget according to the result
m.SyncTarget.Store(res.EndHeight)
err = m.syncUntilTarget(res.EndHeight)
m.LastSubmittedHeight.Store(res.EndHeight)
err = m.syncToTargetHeight(res.EndHeight)
if err != nil {
return err
}

m.logger.Info("Synced.", "current height", m.State.Height(), "syncTarget", m.SyncTarget.Load())
m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSubmittedHeight.Load())
return nil
}

// UpdateSyncParams updates the sync target and state index if necessary
func (m *Manager) UpdateSyncParams(endHeight uint64) {
types.RollappHubHeightGauge.Set(float64(endHeight))
m.logger.Info("SyncTarget updated", "syncTarget", endHeight)
m.SyncTarget.Store(endHeight)
}
12 changes: 5 additions & 7 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@ func TestProduceOnlyAfterSynced(t *testing.T) {

t.Log("Taking the manager out of sync by submitting a batch")

syncTarget := manager.SyncTarget.Load()
numBatchesToAdd := 2
nextBatchStartHeight := syncTarget + 1
nextBatchStartHeight := manager.NextHeightToSubmit()
var batch *types.Batch
for i := 0; i < numBatchesToAdd; i++ {
batch, err = testutil.GenerateBatch(nextBatchStartHeight, nextBatchStartHeight+uint64(testutil.DefaultTestBatchSize-1), manager.ProposerKey)
Expand All @@ -135,7 +134,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
}

// Initially sync target is 0
assert.Zero(t, manager.SyncTarget.Load())
assert.Zero(t, manager.LastSubmittedHeight.Load())
assert.True(t, manager.State.Height() == 0)

// enough time to sync and produce blocks
Expand All @@ -149,7 +148,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
assert.NoError(t, err)
}()
<-ctx.Done()
assert.Equal(t, batch.EndHeight, manager.SyncTarget.Load())
assert.Equal(t, batch.EndHeight, manager.LastSubmittedHeight.Load())
// validate that we produced blocks
assert.Greater(t, manager.State.Height(), batch.EndHeight)
}
Expand Down Expand Up @@ -353,7 +352,7 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
}

// Call createNextDABatch function
startHeight := manager.SyncTarget.Load() + 1
startHeight := manager.NextHeightToSubmit()
endHeight := startHeight + uint64(tc.blocksToProduce) - 1
batch, err := manager.CreateNextBatchToSubmit(startHeight, endHeight)
assert.NoError(err)
Expand Down Expand Up @@ -397,8 +396,7 @@ func TestDAFetch(t *testing.T) {

app.On("Commit", mock.Anything).Return(abci.ResponseCommit{Data: commitHash[:]})

syncTarget := manager.SyncTarget.Load()
nextBatchStartHeight := syncTarget + 1
nextBatchStartHeight := manager.NextHeightToSubmit()
batch, err := testutil.GenerateBatch(nextBatchStartHeight, nextBatchStartHeight+uint64(testutil.DefaultTestBatchSize-1), manager.ProposerKey)
require.NoError(err)
daResultSubmitBatch := manager.DAClient.SubmitBatch(batch)
Expand Down
16 changes: 8 additions & 8 deletions block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,28 @@ package block

import (
"fmt"
)

func (m *Manager) pruneBlocks(retainHeight uint64) (uint64, error) {
syncTarget := m.SyncTarget.Load()
"github.com/dymensionxyz/dymint/gerr"
)

if retainHeight > syncTarget {
return 0, fmt.Errorf("cannot prune uncommitted blocks")
func (m *Manager) pruneBlocks(retainHeight uint64) error {
if m.IsSequencer() && retainHeight <= m.NextHeightToSubmit() { // do not delete anything that we might submit in future
return fmt.Errorf("cannot prune blocks before they have been submitted: %d: %w", retainHeight, gerr.ErrInvalidArgument)
}

pruned, err := m.Store.PruneBlocks(m.State.BaseHeight, retainHeight)
if err != nil {
return 0, fmt.Errorf("prune block store: %w", err)
return fmt.Errorf("prune block store: %w", err)
}

// TODO: prune state/indexer and state/txindexer??

m.State.BaseHeight = retainHeight
_, err = m.Store.SaveState(m.State, nil)
if err != nil {
return 0, fmt.Errorf("save state: %w", err)
return fmt.Errorf("save state: %w", err)
}

m.logger.Info("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
return pruned, nil
return nil
}
17 changes: 8 additions & 9 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,27 @@ import (
// fetching batches from the settlement layer and then fetching the actual blocks
// from the DA.
func (m *Manager) RetrieveLoop(ctx context.Context) {
m.logger.Info("started retrieve loop")
syncTargetPoller := diodes.NewPoller(m.SyncTargetDiode, diodes.WithPollingContext(ctx))
m.logger.Info("Started retrieve loop.")
p := diodes.NewPoller(m.targetSyncHeight, diodes.WithPollingContext(ctx))

for {
select {
case <-ctx.Done():
return
default:
// Get only the latest sync target
targetHeight := syncTargetPoller.Next()
err := m.syncUntilTarget(*(*uint64)(targetHeight))
targetHeight := p.Next() // We only care about the latest one
err := m.syncToTargetHeight(*(*uint64)(targetHeight))
if err != nil {
panic(fmt.Errorf("sync until target: %w", err))
}
}
}
}

// syncUntilTarget syncs blocks until the target height is reached.
// syncToTargetHeight syncs blocks until the target height is reached.
// It fetches the batches from the settlement, gets the DA height and gets
// the actual blocks from the DA.
func (m *Manager) syncUntilTarget(targetHeight uint64) error {
func (m *Manager) syncToTargetHeight(targetHeight uint64) error {
for currH := m.State.Height(); currH < targetHeight; currH = m.State.Height() {

// It's important that we query the state index before fetching the batch, rather
Expand Down Expand Up @@ -101,8 +100,8 @@ func (m *Manager) ProcessNextDABatch(daMetaData *da.DASubmitMetaData) error {

m.logger.Debug("retrieved batches", "n", len(batchResp.Batches), "daHeight", daMetaData.Height)

m.retrieverMutex.Lock()
defer m.retrieverMutex.Unlock()
m.retrieverMu.Lock()
defer m.retrieverMu.Unlock()

for _, batch := range batchResp.Batches {
for i, block := range batch.Blocks {
Expand Down
5 changes: 3 additions & 2 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (m *Manager) AccumulatedDataLoop(ctx context.Context, toSubmit chan struct{
func (m *Manager) HandleSubmissionTrigger() error {
// Load current sync target and height to determine if new blocks are available for submission.

startHeight := m.SyncTarget.Load() + 1
startHeight := m.NextHeightToSubmit()
endHeightInclusive := m.State.Height()

if endHeightInclusive < startHeight {
Expand All @@ -137,7 +137,8 @@ func (m *Manager) HandleSubmissionTrigger() error {
}
m.logger.Info("Submitted batch to SL.", "start height", resultSubmitToDA, "end height", nextBatch.EndHeight)

m.UpdateSyncParams(actualEndHeight)
types.RollappHubHeightGauge.Set(float64(actualEndHeight))
m.LastSubmittedHeight.Store(actualEndHeight)
return nil
}

Expand Down
Loading

0 comments on commit 04b13a0

Please sign in to comment.