From 82abf78a0a7ebeb1b629f533f5a8f8f3258422e2 Mon Sep 17 00:00:00 2001 From: Sergi Rene Date: Thu, 19 Dec 2024 12:28:29 +0100 Subject: [PATCH 1/5] batch time calculation refactor --- block/block.go | 2 -- block/manager.go | 39 ++++++----------------- block/produce.go | 4 +-- block/submit.go | 60 ++++++++++++++++++++++++++++++++--- block/submit_loop_test.go | 5 ++- block/submit_test.go | 4 +-- block/sync.go | 4 +-- settlement/dymension/utils.go | 1 + settlement/settlement.go | 7 ++-- 9 files changed, 80 insertions(+), 46 deletions(-) diff --git a/block/block.go b/block/block.go index 4b4562794..a1d7b90e5 100644 --- a/block/block.go +++ b/block/block.go @@ -127,8 +127,6 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta } - // save last block time used to calculate batch skew time - m.LastBlockTime.Store(block.Header.GetTimestamp().UTC().UnixNano()) // Update the store: // 1. Save the proposer for the current height to the store. // 2. Update the proposer in the state in case of rotation. diff --git a/block/manager.go b/block/manager.go index 61d74a6ab..00407c122 100644 --- a/block/manager.go +++ b/block/manager.go @@ -72,11 +72,8 @@ type Manager struct { Cancel context.CancelFunc Ctx context.Context - // LastBlockTimeInSettlement is the time of last submitted block, used to measure batch skew time - LastBlockTimeInSettlement atomic.Int64 - - // LastBlockTime is the time of last produced block, used to measure batch skew time - LastBlockTime atomic.Int64 + // LastSubmissionTime is the time of last batch submitted in SL + LastSubmissionTime atomic.Int64 // mutex used to avoid stopping node when fork is detected but proposer is creating/sending fork batch forkMu sync.Mutex @@ -310,13 +307,12 @@ func (m *Manager) updateFromLastSettlementState() error { m.logger.Error("Cannot fetch sequencer set from the Hub", "error", err) } - // update latest height from SL - latestHeight, err := m.SLClient.GetLatestHeight() + // get latest submitted batch from SL + latestBatch, err := m.SLClient.GetLatestBatch() if errors.Is(err, gerrc.ErrNotFound) { // The SL hasn't got any batches for this chain yet. m.logger.Info("No batches for chain found in SL.") m.LastSettlementHeight.Store(uint64(m.Genesis.InitialHeight - 1)) //nolint:gosec // height is non-negative and falls in int64 - m.LastBlockTimeInSettlement.Store(m.Genesis.GenesisTime.UTC().UnixNano()) return nil } if err != nil { @@ -324,21 +320,14 @@ func (m *Manager) updateFromLastSettlementState() error { return err } - m.P2PClient.UpdateLatestSeenHeight(latestHeight) - if latestHeight >= m.State.NextHeight() { - m.UpdateTargetHeight(latestHeight) + m.P2PClient.UpdateLatestSeenHeight(latestBatch.EndHeight) + if latestBatch.EndHeight >= m.State.NextHeight() { + m.UpdateTargetHeight(latestBatch.EndHeight) } - m.LastSettlementHeight.Store(latestHeight) - - // init last block in settlement time in dymint state to calculate batch submit skew time - m.SetLastBlockTimeInSettlementFromHeight(latestHeight) + m.LastSettlementHeight.Store(latestBatch.EndHeight) + m.LastSubmissionTime.Store(latestBatch.CreationTime.UTC().UnixNano()) - // init last block time in dymint state to calculate batch submit skew time - block, err := m.Store.LoadBlock(m.State.Height()) - if err == nil { - m.LastBlockTime.Store(block.Header.GetTimestamp().UTC().UnixNano()) - } return nil } @@ -420,13 +409,3 @@ func (m *Manager) freezeNode(err error) { uevent.MustPublish(m.Ctx, m.Pubsub, &events.DataHealthStatus{Error: err}, events.HealthStatusList) m.Cancel() } - -// SetLastBlockTimeInSettlementFromHeight is used to initialize LastBlockTimeInSettlement from rollapp height in settlement -func (m *Manager) SetLastBlockTimeInSettlementFromHeight(lastSettlementHeight uint64) { - block, err := m.Store.LoadBlock(lastSettlementHeight) - if err != nil { - // if settlement height block is not found it will be updated after, when syncing - return - } - m.LastBlockTimeInSettlement.Store(block.Header.GetTimestamp().UTC().UnixNano()) -} diff --git a/block/produce.go b/block/produce.go index 9a67fe77b..a57f46674 100644 --- a/block/produce.go +++ b/block/produce.go @@ -82,9 +82,9 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int) return nil case bytesProducedC <- bytesProducedN: default: - evt := &events.DataHealthStatus{Error: fmt.Errorf("Block production paused. Time between last block produced and last block submitted higher than max skew time: %s last block in settlement time: %s %w", m.Conf.MaxSkewTime, time.Unix(0, m.LastBlockTimeInSettlement.Load()), gerrc.ErrResourceExhausted)} + evt := &events.DataHealthStatus{Error: fmt.Errorf("Block production paused. Time between last block produced and last block submitted higher than max skew time: %s last block in settlement time: %s %w", m.Conf.MaxSkewTime, m.GetLastBlockTimeInSettlement(), gerrc.ErrResourceExhausted)} uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList) - m.logger.Error("Pausing block production until new batch is submitted.", "Batch skew time", m.GetBatchSkewTime(), "Max batch skew time", m.Conf.MaxSkewTime, "Last block in settlement time", time.Unix(0, m.LastBlockTimeInSettlement.Load())) + m.logger.Error("Pausing block production until new batch is submitted.", "Batch skew time", m.GetBatchSkewTime(), "Max batch skew time", m.Conf.MaxSkewTime, "Last block in settlement time", m.GetLastBlockTimeInSettlement()) select { case <-ctx.Done(): return nil diff --git a/block/submit.go b/block/submit.go index 3ee4e2dc4..aac55efbe 100644 --- a/block/submit.go +++ b/block/submit.go @@ -33,6 +33,7 @@ func (m *Manager) SubmitLoop(ctx context.Context, m.GetUnsubmittedBlocks, m.GetUnsubmittedBytes, m.GetBatchSkewTime, + m.isLastBatchRecent, m.Conf.BatchSubmitTime, m.Conf.BatchSubmitBytes, m.CreateAndSubmitBatchGetSizeBlocksCommits, @@ -48,6 +49,7 @@ func SubmitLoopInner( unsubmittedBlocksNum func() uint64, // func that returns the amount of non-submitted blocks unsubmittedBlocksBytes func() int, // func that returns bytes from non-submitted blocks batchSkewTime func() time.Duration, // func that returns measured time between last submitted block and last produced block + isLastBatchRecent func(time.Duration) bool, // func that returns true if the last batch submission time is more recent than batch submission time maxBatchSubmitTime time.Duration, // max time to allow between batches maxBatchSubmitBytes uint64, // max size of serialised batch in bytes createAndSubmitBatch func(maxSizeBytes uint64) (bytes uint64, err error), @@ -103,7 +105,7 @@ func SubmitLoopInner( done := ctx.Err() != nil nothingToSubmit := pending == 0 - lastSubmissionIsRecent := batchSkewTime() < maxBatchSubmitTime + lastSubmissionIsRecent := isLastBatchRecent(maxBatchSubmitTime) maxDataNotExceeded := pending <= maxBatchSubmitBytes UpdateBatchSubmissionGauges(pending, unsubmittedBlocksNum(), batchSkewTime()) @@ -257,7 +259,7 @@ func (m *Manager) SubmitBatch(batch *types.Batch) error { m.LastSettlementHeight.Store(batch.EndHeight()) // update last submitted block time with batch last block (used to calculate max skew time) - m.LastBlockTimeInSettlement.Store(batch.Blocks[len(batch.Blocks)-1].Header.GetTimestamp().UTC().UnixNano()) + m.LastSubmissionTime.Store(time.Now().UTC().UnixNano()) return err } @@ -314,11 +316,59 @@ func (m *Manager) UpdateLastSubmittedHeight(event pubsub.Message) { } } +// GetLastBlockTime returns the time of the last produced block +func (m *Manager) GetLastProducedBlockTime() time.Time { + lastProducedBlock, err := m.Store.LoadBlock(m.State.Height()) + if err != nil { + return time.Time{} + } + return lastProducedBlock.Header.GetTimestamp() +} + +// GetLastBlockTimeInSettlement returns the time of the last submitted block to SL +// If no height in settlement returns first block time +// If no first block produced returns now +// If different error returns empty time +func (m *Manager) GetLastBlockTimeInSettlement() time.Time { + lastBlockInSettlement, err := m.Store.LoadBlock(m.LastSettlementHeight.Load()) + if err != nil && !errors.Is(err, gerrc.ErrNotFound) { + return time.Time{} + } + if errors.Is(err, gerrc.ErrNotFound) { + firstBlock, err := m.Store.LoadBlock(uint64(m.Genesis.InitialHeight)) + if errors.Is(err, gerrc.ErrNotFound) { + return time.Now() + } + if err != nil { + return time.Time{} + } + return firstBlock.Header.GetTimestamp() + + } + return lastBlockInSettlement.Header.GetTimestamp() +} + // GetBatchSkewTime returns the time between the last produced block and the last block submitted to SL func (m *Manager) GetBatchSkewTime() time.Duration { - lastProducedTime := time.Unix(0, m.LastBlockTime.Load()) - lastSubmittedTime := time.Unix(0, m.LastBlockTimeInSettlement.Load()) - return lastProducedTime.Sub(lastSubmittedTime) + return m.GetLastProducedBlockTime().Sub(m.GetLastBlockTimeInSettlement()) +} + +// isLastBatchRecent returns true if the last batch submitted is more recent than submitBatchTime +// in case of no submission time the first block produced is used as a reference. +func (m *Manager) isLastBatchRecent(submitBatchTime time.Duration) bool { + + var lastSubmittedTime time.Time + if m.LastSubmissionTime.Load() == 0 { + firstBlock, err := m.Store.LoadBlock(uint64(m.Genesis.InitialHeight)) + if err != nil { + return true + } else { + lastSubmittedTime = firstBlock.Header.GetTimestamp() + } + } else { + lastSubmittedTime = time.Unix(0, m.LastSubmissionTime.Load()) + } + return time.Now().Sub(lastSubmittedTime) < submitBatchTime } func UpdateBatchSubmissionGauges(skewBytes uint64, skewBlocks uint64, skewTime time.Duration) { diff --git a/block/submit_loop_test.go b/block/submit_loop_test.go index d6ed9d56c..d49093842 100644 --- a/block/submit_loop_test.go +++ b/block/submit_loop_test.go @@ -125,8 +125,11 @@ func testSubmitLoopInner( pendingBytes := func() int { return int(nProducedBytes.Load()) } + isLastBatchRecent := func(time.Duration) bool { + return true + } - block.SubmitLoopInner(ctx, log.NewNopLogger(), producedBytesC, args.batchSkew, accumulatedBlocks, pendingBytes, skewTime, args.maxTime, args.batchBytes, submitBatch) + block.SubmitLoopInner(ctx, log.NewNopLogger(), producedBytesC, args.batchSkew, accumulatedBlocks, pendingBytes, skewTime, isLastBatchRecent, args.maxTime, args.batchBytes, submitBatch) } // Make sure the producer does not get too far ahead diff --git a/block/submit_test.go b/block/submit_test.go index 59017b00f..48e4b7a5d 100644 --- a/block/submit_test.go +++ b/block/submit_test.go @@ -268,7 +268,7 @@ func TestSubmissionByTime(t *testing.T) { manager.DAClient = testutil.GetMockDALC(log.TestingLogger()) manager.Retriever = manager.DAClient.(da.BatchRetriever) - manager.LastBlockTimeInSettlement.Store(time.Now().UTC().UnixNano()) + manager.LastSubmissionTime.Store(time.Now().UTC().UnixNano()) // Check initial height initialHeight := uint64(0) require.Equal(initialHeight, manager.State.Height()) @@ -341,7 +341,7 @@ func TestSubmissionByBatchSize(t *testing.T) { managerConfig.BatchSubmitBytes = c.blockBatchMaxSizeBytes manager, err := testutil.GetManager(managerConfig, nil, 1, 1, 0, proxyApp, nil) require.NoError(err) - manager.LastBlockTimeInSettlement.Store(time.Now().UTC().UnixNano()) + manager.LastSubmissionTime.Store(time.Now().UTC().UnixNano()) manager.DAClient = testutil.GetMockDALC(log.TestingLogger()) manager.Retriever = manager.DAClient.(da.BatchRetriever) diff --git a/block/sync.go b/block/sync.go index 9c3605669..5b9f9aee2 100644 --- a/block/sync.go +++ b/block/sync.go @@ -76,8 +76,8 @@ func (m *Manager) SettlementSyncLoop(ctx context.Context) error { } m.logger.Info("Retrieved state update from SL.", "state_index", settlementBatch.StateIndex) - // we update LastBlockTimeInSettlement to be able to measure batch skew time with last block time in settlement - m.LastBlockTimeInSettlement.Store(settlementBatch.BlockDescriptors[len(settlementBatch.BlockDescriptors)-1].GetTimestamp().UTC().UnixNano()) + // we update LastSubmissionTime to be able to measure batch submission time + m.LastSubmissionTime.Store(settlementBatch.Batch.CreationTime.UTC().UnixNano()) err = m.ApplyBatchFromSL(settlementBatch.Batch) diff --git a/settlement/dymension/utils.go b/settlement/dymension/utils.go index def62fb91..4396744eb 100644 --- a/settlement/dymension/utils.go +++ b/settlement/dymension/utils.go @@ -49,6 +49,7 @@ func convertStateInfoToResultRetrieveBatch(stateInfo *rollapptypes.StateInfo) (* BlockDescriptors: stateInfo.BDs.BD, NumBlocks: stateInfo.NumBlocks, NextSequencer: stateInfo.NextProposer, + CreationTime: stateInfo.CreatedAt, } return &settlement.ResultRetrieveBatch{ diff --git a/settlement/settlement.go b/settlement/settlement.go index 4b03327a2..eddd05a2d 100644 --- a/settlement/settlement.go +++ b/settlement/settlement.go @@ -1,6 +1,8 @@ package settlement import ( + "time" + "github.com/tendermint/tendermint/libs/pubsub" "github.com/dymensionxyz/dymint/da" @@ -42,8 +44,9 @@ type Batch struct { NextSequencer string // MetaData about the batch in the DA layer - MetaData *BatchMetaData - NumBlocks uint64 // FIXME: can be removed. not used and will be deprecated + MetaData *BatchMetaData + NumBlocks uint64 // FIXME: can be removed. not used and will be deprecated + CreationTime time.Time } type ResultRetrieveBatch struct { From 98a248887a7f3872e04d105c858b7be8d465b1be Mon Sep 17 00:00:00 2001 From: Sergi Rene Date: Thu, 19 Dec 2024 14:01:00 +0100 Subject: [PATCH 2/5] add mock for testing --- block/submit.go | 4 ++-- settlement/local/local.go | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/block/submit.go b/block/submit.go index aac55efbe..5c73e7ab6 100644 --- a/block/submit.go +++ b/block/submit.go @@ -88,8 +88,8 @@ func SubmitLoopInner( }) eg.Go(func() error { - // 'submitter': this thread actually creates and submits batches. this thread is woken up every batch_submit_time (in addition to every block produced) to check if there is anything to submit even if no new blocks have been produced - ticker := time.NewTicker(maxBatchSubmitTime) + // 'submitter': this thread actually creates and submits batches. this thread is woken up every batch_submit_time/10 (we used /10 to avoid waiting too much if submission is not required for t-maxBatchSubmitTime, but it maybe required before t) to check if submission is required even if no new blocks have been produced + ticker := time.NewTicker(maxBatchSubmitTime / 10) for { select { case <-ctx.Done(): diff --git a/settlement/local/local.go b/settlement/local/local.go index 4d8a64664..871d114f0 100644 --- a/settlement/local/local.go +++ b/settlement/local/local.go @@ -334,6 +334,7 @@ func (c *Client) convertBatchToSettlementBatch(batch *types.Batch, daResult *da. }, }, BlockDescriptors: bds, + CreationTime: time.Now(), } return settlementBatch From e23f784ef004b6163f18d17e010662257752a0eb Mon Sep 17 00:00:00 2001 From: Sergi Rene Date: Thu, 19 Dec 2024 14:33:43 +0100 Subject: [PATCH 3/5] lint fix --- block/submit.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/block/submit.go b/block/submit.go index 5c73e7ab6..b408e9036 100644 --- a/block/submit.go +++ b/block/submit.go @@ -335,7 +335,7 @@ func (m *Manager) GetLastBlockTimeInSettlement() time.Time { return time.Time{} } if errors.Is(err, gerrc.ErrNotFound) { - firstBlock, err := m.Store.LoadBlock(uint64(m.Genesis.InitialHeight)) + firstBlock, err := m.Store.LoadBlock(uint64(m.Genesis.InitialHeight)) //nolint:gosec // height is non-negative and falls in int64 if errors.Is(err, gerrc.ErrNotFound) { return time.Now() } @@ -356,10 +356,9 @@ func (m *Manager) GetBatchSkewTime() time.Duration { // isLastBatchRecent returns true if the last batch submitted is more recent than submitBatchTime // in case of no submission time the first block produced is used as a reference. func (m *Manager) isLastBatchRecent(submitBatchTime time.Duration) bool { - var lastSubmittedTime time.Time if m.LastSubmissionTime.Load() == 0 { - firstBlock, err := m.Store.LoadBlock(uint64(m.Genesis.InitialHeight)) + firstBlock, err := m.Store.LoadBlock(uint64(m.Genesis.InitialHeight)) //nolint:gosec // height is non-negative and falls in int64 if err != nil { return true } else { @@ -368,7 +367,7 @@ func (m *Manager) isLastBatchRecent(submitBatchTime time.Duration) bool { } else { lastSubmittedTime = time.Unix(0, m.LastSubmissionTime.Load()) } - return time.Now().Sub(lastSubmittedTime) < submitBatchTime + return time.Since(lastSubmittedTime) < submitBatchTime } func UpdateBatchSubmissionGauges(skewBytes uint64, skewBlocks uint64, skewTime time.Duration) { From 9b3b3740e629d2b371025c58b7faefe4fdf2d031 Mon Sep 17 00:00:00 2001 From: Sergi Rene Date: Thu, 19 Dec 2024 14:45:33 +0100 Subject: [PATCH 4/5] prunning --- block/pruning.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/block/pruning.go b/block/pruning.go index 9a92451e9..7dbde61a4 100644 --- a/block/pruning.go +++ b/block/pruning.go @@ -37,7 +37,7 @@ func (m *Manager) PruningLoop(ctx context.Context) error { case retainHeight := <-m.pruningC: var pruningHeight uint64 if m.RunMode == RunModeProposer { // do not delete anything that we might submit in future - pruningHeight = min(m.NextHeightToSubmit(), uint64(retainHeight)) + pruningHeight = min(m.LastSettlementHeight.Load(), uint64(retainHeight)) } else { // do not delete anything that is not validated yet pruningHeight = min(m.SettlementValidator.NextValidationHeight(), uint64(retainHeight)) } From 0c3dfb150fbd4aa4bd4b3c4e4bfc9298f6a8b419 Mon Sep 17 00:00:00 2001 From: Sergi Rene Date: Thu, 19 Dec 2024 22:45:19 +0100 Subject: [PATCH 5/5] var rename --- block/submit.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/block/submit.go b/block/submit.go index b408e9036..c50dbc5a6 100644 --- a/block/submit.go +++ b/block/submit.go @@ -353,9 +353,9 @@ func (m *Manager) GetBatchSkewTime() time.Duration { return m.GetLastProducedBlockTime().Sub(m.GetLastBlockTimeInSettlement()) } -// isLastBatchRecent returns true if the last batch submitted is more recent than submitBatchTime +// isLastBatchRecent returns true if the last batch submitted is more recent than maxBatchSubmitTime // in case of no submission time the first block produced is used as a reference. -func (m *Manager) isLastBatchRecent(submitBatchTime time.Duration) bool { +func (m *Manager) isLastBatchRecent(maxBatchSubmitTime time.Duration) bool { var lastSubmittedTime time.Time if m.LastSubmissionTime.Load() == 0 { firstBlock, err := m.Store.LoadBlock(uint64(m.Genesis.InitialHeight)) //nolint:gosec // height is non-negative and falls in int64 @@ -367,7 +367,7 @@ func (m *Manager) isLastBatchRecent(submitBatchTime time.Duration) bool { } else { lastSubmittedTime = time.Unix(0, m.LastSubmissionTime.Load()) } - return time.Since(lastSubmittedTime) < submitBatchTime + return time.Since(lastSubmittedTime) < maxBatchSubmitTime } func UpdateBatchSubmissionGauges(skewBytes uint64, skewBlocks uint64, skewTime time.Duration) {