Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(manager): removes outdated syncTarget field #854

Merged
merged 29 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7b280ee
BROKEN - few changes to start removing sync target
danwt May 13, 2024
744d90c
removes UpdateSyncParams as a func
danwt May 13, 2024
c4db6ff
renames sync to target -> sync to target height
danwt May 13, 2024
6b3f56b
renames and reshuffles
danwt May 13, 2024
e17b234
removes all non test reads of sync target
danwt May 13, 2024
16cd5db
fix all non test access
danwt May 13, 2024
a587a01
rename to LastSubmittedHeight
danwt May 13, 2024
e1de9b2
removes test references
danwt May 13, 2024
88abca6
fix retrieve loop
danwt May 13, 2024
a054d81
get rid of the unnecssary prune check
danwt May 13, 2024
afde4f8
rename SyncToTargetHeightLoop
danwt May 13, 2024
0facc1e
simplify
danwt May 13, 2024
f290b86
simplify metric
danwt May 13, 2024
d308d95
test fix
danwt May 13, 2024
9606d91
improve docstring
danwt May 13, 2024
df608b5
merge main
danwt May 15, 2024
c1da14f
merge in main and manually resolve conflicts (BROKEN)
danwt May 16, 2024
ad7c279
adds in more manually resolved conflicts (BROKEN)
danwt May 16, 2024
448db03
fix one breakage
danwt May 16, 2024
b34da8d
rename and move retriever mu
danwt May 16, 2024
a37f10a
tweaks
danwt May 16, 2024
742ee1e
fmt
danwt May 16, 2024
56df81d
adds a protective check for pruning, but need to make last submitted …
danwt May 16, 2024
a22895f
use atomic on last submitted
danwt May 16, 2024
e64d78e
fix comment
danwt May 16, 2024
7bce876
add a sequencer check, but not sure it works
danwt May 16, 2024
11757d3
moved pre prune check to prune func
danwt May 16, 2024
c05a41f
move prune if cond
danwt May 16, 2024
74ce466
merge main and resolve conflict
danwt May 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
}

// Prune old heights, if requested by ABCI app.
if retainHeight > 0 {
_, err := m.pruneBlocks(uint64(retainHeight))
if 0 < retainHeight {
err = m.pruneBlocks(uint64(retainHeight))
if err != nil {
m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err)
}
Expand All @@ -114,8 +114,8 @@ func (m *Manager) isHeightAlreadyApplied(blockHeight uint64) (bool, error) {
}

func (m *Manager) attemptApplyCachedBlocks() error {
m.retrieverMutex.Lock()
defer m.retrieverMutex.Unlock()
m.retrieverMu.Lock()
defer m.retrieverMu.Unlock()

for {
expectedHeight := m.State.NextHeight()
Expand Down
6 changes: 3 additions & 3 deletions block/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
eventData, _ := event.Data().(p2p.GossipedBlock)
block := eventData.Block
commit := eventData.Commit
m.retrieverMutex.Lock() // needed to protect blockCache access
m.retrieverMu.Lock() // needed to protect blockCache access
_, found := m.blockCache[block.Header.Height]
// It is not strictly necessary to return early, for correctness, but doing so helps us avoid mutex pressure and unnecessary repeated attempts to apply cached blocks
if found {
m.retrieverMutex.Unlock()
m.retrieverMu.Unlock()
return
}

Expand All @@ -31,7 +31,7 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
Commit: &commit,
}
}
m.retrieverMutex.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant
m.retrieverMu.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant

err := m.attemptApplyCachedBlocks()
if err != nil {
Expand Down
107 changes: 61 additions & 46 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (

// Manager is responsible for aggregating transactions into blocks.
type Manager struct {
logger types.Logger

// Configuration
Conf config.BlockManagerConfig
Genesis *tmtypes.GenesisDoc
Expand All @@ -47,25 +49,30 @@ type Manager struct {
DAClient da.DataAvailabilityLayerClient
SLClient settlement.LayerI

// Data retrieval
Retriever da.BatchRetriever
SyncTargetDiode diodes.Diode
SyncTarget atomic.Uint64

// Block production
producedSizeCh chan uint64 // channel for the producer to report the size of the block it produced
/*
Production
*/
producedSizeCh chan uint64 // for the producer to report the size of the block it produced

// Submitter
/*
Submission
*/
AccumulatedBatchSize atomic.Uint64
// The last height which was submitted to both sublayers, that we know of. When we produce new batches, we will
// start at this height + 1. Note: only accessed by one thread at a time so doesn't need synchro.
// It is ALSO used by the producer, because the producer needs to check if it can prune blocks and it wont'
// prune anything that might be submitted in the future. Therefore, it must be atomic.
LastSubmittedHeight atomic.Uint64

/*
Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks,
and incoming DA blocks, respectively.
Retrieval
*/
retrieverMutex sync.Mutex

logger types.Logger

// Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks,
// and incoming DA blocks, respectively.
retrieverMu sync.Mutex
Retriever da.BatchRetriever
// get the next target height to sync local state to
targetSyncHeight diodes.Diode
// Cached blocks and commits for applying at future heights. The blocks may not be valid, because
// we can only do full validation in sequential order.
blockCache map[uint64]CachedBlock
Expand Down Expand Up @@ -101,21 +108,21 @@ func NewManager(
}

agg := &Manager{
Pubsub: pubsub,
p2pClient: p2pClient,
ProposerKey: proposerKey,
Conf: conf,
Genesis: genesis,
State: s,
Store: store,
Executor: exec,
DAClient: dalc,
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
SyncTargetDiode: diodes.NewOneToOne(1, nil),
producedSizeCh: make(chan uint64),
logger: logger,
blockCache: make(map[uint64]CachedBlock),
Pubsub: pubsub,
p2pClient: p2pClient,
ProposerKey: proposerKey,
Conf: conf,
Genesis: genesis,
State: s,
Store: store,
Executor: exec,
DAClient: dalc,
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
targetSyncHeight: diodes.NewOneToOne(1, nil),
producedSizeCh: make(chan uint64),
logger: logger,
blockCache: make(map[uint64]CachedBlock),
}

return agg, nil
Expand All @@ -125,13 +132,11 @@ func NewManager(
func (m *Manager) Start(ctx context.Context) error {
m.logger.Info("Starting the block manager")

slProposerKey := m.SLClient.GetProposer().PublicKey.Bytes()
localProposerKey, err := m.ProposerKey.GetPublic().Raw()
isSequencer, err := m.IsSequencerVerify()
if err != nil {
return fmt.Errorf("get local node public key: %w", err)
return err
}

isSequencer := bytes.Equal(slProposerKey, localProposerKey)
m.logger.Info("Starting block manager", "isSequencer", isSequencer)

// Check if InitChain flow is needed
Expand Down Expand Up @@ -159,39 +164,49 @@ func (m *Manager) Start(ctx context.Context) error {
go m.SubmitLoop(ctx)
} else {
go m.RetrieveLoop(ctx)
go m.SyncTargetLoop(ctx)
go m.SyncToTargetHeightLoop(ctx)
}

return nil
}

func (m *Manager) IsSequencerVerify() (bool, error) {
slProposerKey := m.SLClient.GetProposer().PublicKey.Bytes()
localProposerKey, err := m.ProposerKey.GetPublic().Raw()
if err != nil {
return false, fmt.Errorf("get local node public key: %w", err)
}
return bytes.Equal(slProposerKey, localProposerKey), nil
}

func (m *Manager) IsSequencer() bool {
ret, _ := m.IsSequencerVerify()
return ret
}

func (m *Manager) NextHeightToSubmit() uint64 {
return m.LastSubmittedHeight.Load() + 1
}

// syncBlockManager enforces the node to be synced on initial run.
func (m *Manager) syncBlockManager() error {
res, err := m.SLClient.RetrieveBatch()
if errors.Is(err, gerr.ErrNotFound) {
// The SL hasn't got any batches for this chain yet.
m.logger.Info("No batches for chain found in SL.")
m.SyncTarget.Store(uint64(m.Genesis.InitialHeight - 1))
m.LastSubmittedHeight.Store(uint64(m.Genesis.InitialHeight - 1))
return nil
}
if err != nil {
// TODO: separate between fresh rollapp and non-registered rollapp
return err
}
// Set the syncTarget according to the result
m.SyncTarget.Store(res.EndHeight)
err = m.syncUntilTarget(res.EndHeight)
m.LastSubmittedHeight.Store(res.EndHeight)
err = m.syncToTargetHeight(res.EndHeight)
if err != nil {
return err
}

m.logger.Info("Synced.", "current height", m.State.Height(), "syncTarget", m.SyncTarget.Load())
m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSubmittedHeight.Load())
return nil
}

// UpdateSyncParams updates the sync target and state index if necessary
func (m *Manager) UpdateSyncParams(endHeight uint64) {
types.RollappHubHeightGauge.Set(float64(endHeight))
m.logger.Info("SyncTarget updated", "syncTarget", endHeight)
m.SyncTarget.Store(endHeight)
}
12 changes: 5 additions & 7 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@ func TestProduceOnlyAfterSynced(t *testing.T) {

t.Log("Taking the manager out of sync by submitting a batch")

syncTarget := manager.SyncTarget.Load()
numBatchesToAdd := 2
nextBatchStartHeight := syncTarget + 1
nextBatchStartHeight := manager.NextHeightToSubmit()
var batch *types.Batch
for i := 0; i < numBatchesToAdd; i++ {
batch, err = testutil.GenerateBatch(nextBatchStartHeight, nextBatchStartHeight+uint64(testutil.DefaultTestBatchSize-1), manager.ProposerKey)
Expand All @@ -135,7 +134,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
}

// Initially sync target is 0
assert.Zero(t, manager.SyncTarget.Load())
assert.Zero(t, manager.LastSubmittedHeight.Load())
assert.True(t, manager.State.Height() == 0)

// enough time to sync and produce blocks
Expand All @@ -149,7 +148,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
assert.NoError(t, err)
}()
<-ctx.Done()
assert.Equal(t, batch.EndHeight, manager.SyncTarget.Load())
assert.Equal(t, batch.EndHeight, manager.LastSubmittedHeight.Load())
// validate that we produced blocks
assert.Greater(t, manager.State.Height(), batch.EndHeight)
}
Expand Down Expand Up @@ -353,7 +352,7 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
}

// Call createNextDABatch function
startHeight := manager.SyncTarget.Load() + 1
startHeight := manager.NextHeightToSubmit()
endHeight := startHeight + uint64(tc.blocksToProduce) - 1
batch, err := manager.CreateNextBatchToSubmit(startHeight, endHeight)
assert.NoError(err)
Expand Down Expand Up @@ -397,8 +396,7 @@ func TestDAFetch(t *testing.T) {

app.On("Commit", mock.Anything).Return(abci.ResponseCommit{Data: commitHash[:]})

syncTarget := manager.SyncTarget.Load()
nextBatchStartHeight := syncTarget + 1
nextBatchStartHeight := manager.NextHeightToSubmit()
batch, err := testutil.GenerateBatch(nextBatchStartHeight, nextBatchStartHeight+uint64(testutil.DefaultTestBatchSize-1), manager.ProposerKey)
require.NoError(err)
daResultSubmitBatch := manager.DAClient.SubmitBatch(batch)
Expand Down
16 changes: 8 additions & 8 deletions block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,28 @@ package block

import (
"fmt"
)

func (m *Manager) pruneBlocks(retainHeight uint64) (uint64, error) {
syncTarget := m.SyncTarget.Load()
"github.com/dymensionxyz/dymint/gerr"
)

if retainHeight > syncTarget {
return 0, fmt.Errorf("cannot prune uncommitted blocks")
func (m *Manager) pruneBlocks(retainHeight uint64) error {
if m.IsSequencer() && retainHeight <= m.NextHeightToSubmit() { // do not delete anything that we might submit in future
return fmt.Errorf("cannot prune blocks before they have been submitted: %d: %w", retainHeight, gerr.ErrInvalidArgument)
}

pruned, err := m.Store.PruneBlocks(m.State.BaseHeight, retainHeight)
if err != nil {
return 0, fmt.Errorf("prune block store: %w", err)
return fmt.Errorf("prune block store: %w", err)
}

// TODO: prune state/indexer and state/txindexer??

m.State.BaseHeight = retainHeight
_, err = m.Store.SaveState(m.State, nil)
if err != nil {
return 0, fmt.Errorf("save state: %w", err)
return fmt.Errorf("save state: %w", err)
}

m.logger.Info("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
return pruned, nil
return nil
}
17 changes: 8 additions & 9 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,27 @@ import (
// fetching batches from the settlement layer and then fetching the actual blocks
// from the DA.
func (m *Manager) RetrieveLoop(ctx context.Context) {
m.logger.Info("started retrieve loop")
syncTargetPoller := diodes.NewPoller(m.SyncTargetDiode, diodes.WithPollingContext(ctx))
m.logger.Info("Started retrieve loop.")
p := diodes.NewPoller(m.targetSyncHeight, diodes.WithPollingContext(ctx))

for {
select {
case <-ctx.Done():
return
default:
// Get only the latest sync target
targetHeight := syncTargetPoller.Next()
err := m.syncUntilTarget(*(*uint64)(targetHeight))
targetHeight := p.Next() // We only care about the latest one
err := m.syncToTargetHeight(*(*uint64)(targetHeight))
if err != nil {
panic(fmt.Errorf("sync until target: %w", err))
}
}
}
}

// syncUntilTarget syncs blocks until the target height is reached.
// syncToTargetHeight syncs blocks until the target height is reached.
// It fetches the batches from the settlement, gets the DA height and gets
// the actual blocks from the DA.
func (m *Manager) syncUntilTarget(targetHeight uint64) error {
func (m *Manager) syncToTargetHeight(targetHeight uint64) error {
for currH := m.State.Height(); currH < targetHeight; currH = m.State.Height() {

// It's important that we query the state index before fetching the batch, rather
Expand Down Expand Up @@ -101,8 +100,8 @@ func (m *Manager) ProcessNextDABatch(daMetaData *da.DASubmitMetaData) error {

m.logger.Debug("retrieved batches", "n", len(batchResp.Batches), "daHeight", daMetaData.Height)

m.retrieverMutex.Lock()
defer m.retrieverMutex.Unlock()
m.retrieverMu.Lock()
defer m.retrieverMu.Unlock()

for _, batch := range batchResp.Batches {
for i, block := range batch.Blocks {
Expand Down
5 changes: 3 additions & 2 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (m *Manager) AccumulatedDataLoop(ctx context.Context, toSubmit chan struct{
func (m *Manager) HandleSubmissionTrigger() error {
// Load current sync target and height to determine if new blocks are available for submission.

startHeight := m.SyncTarget.Load() + 1
startHeight := m.NextHeightToSubmit()
endHeightInclusive := m.State.Height()

if endHeightInclusive < startHeight {
Expand All @@ -137,7 +137,8 @@ func (m *Manager) HandleSubmissionTrigger() error {
}
m.logger.Info("Submitted batch to SL.", "start height", resultSubmitToDA, "end height", nextBatch.EndHeight)

m.UpdateSyncParams(actualEndHeight)
types.RollappHubHeightGauge.Set(float64(actualEndHeight))
m.LastSubmittedHeight.Store(actualEndHeight)
return nil
}

Expand Down
Loading
Loading