Skip to content

Commit

Permalink
fix(submission): actually submit every max batch submission time and …
Browse files Browse the repository at this point in the history
…not after (#1283)
  • Loading branch information
srene authored Dec 20, 2024
1 parent 5881e3e commit e2308e3
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 49 deletions.
2 changes: 0 additions & 2 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,6 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta

}

// save last block time used to calculate batch skew time
m.LastBlockTime.Store(block.Header.GetTimestamp().UTC().UnixNano())
// Update the store:
// 1. Save the proposer for the current height to the store.
// 2. Update the proposer in the state in case of rotation.
Expand Down
39 changes: 9 additions & 30 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,8 @@ type Manager struct {
Cancel context.CancelFunc
Ctx context.Context

// LastBlockTimeInSettlement is the time of last submitted block, used to measure batch skew time
LastBlockTimeInSettlement atomic.Int64

// LastBlockTime is the time of last produced block, used to measure batch skew time
LastBlockTime atomic.Int64
// LastSubmissionTime is the time of last batch submitted in SL
LastSubmissionTime atomic.Int64

// mutex used to avoid stopping node when fork is detected but proposer is creating/sending fork batch
forkMu sync.Mutex
Expand Down Expand Up @@ -310,35 +307,27 @@ func (m *Manager) updateFromLastSettlementState() error {
m.logger.Error("Cannot fetch sequencer set from the Hub", "error", err)
}

// update latest height from SL
latestHeight, err := m.SLClient.GetLatestHeight()
// get latest submitted batch from SL
latestBatch, err := m.SLClient.GetLatestBatch()
if errors.Is(err, gerrc.ErrNotFound) {
// The SL hasn't got any batches for this chain yet.
m.logger.Info("No batches for chain found in SL.")
m.LastSettlementHeight.Store(uint64(m.Genesis.InitialHeight - 1)) //nolint:gosec // height is non-negative and falls in int64
m.LastBlockTimeInSettlement.Store(m.Genesis.GenesisTime.UTC().UnixNano())
return nil
}
if err != nil {
// TODO: separate between fresh rollapp and non-registered rollapp
return err
}

m.P2PClient.UpdateLatestSeenHeight(latestHeight)
if latestHeight >= m.State.NextHeight() {
m.UpdateTargetHeight(latestHeight)
m.P2PClient.UpdateLatestSeenHeight(latestBatch.EndHeight)
if latestBatch.EndHeight >= m.State.NextHeight() {
m.UpdateTargetHeight(latestBatch.EndHeight)
}

m.LastSettlementHeight.Store(latestHeight)

// init last block in settlement time in dymint state to calculate batch submit skew time
m.SetLastBlockTimeInSettlementFromHeight(latestHeight)
m.LastSettlementHeight.Store(latestBatch.EndHeight)
m.LastSubmissionTime.Store(latestBatch.CreationTime.UTC().UnixNano())

// init last block time in dymint state to calculate batch submit skew time
block, err := m.Store.LoadBlock(m.State.Height())
if err == nil {
m.LastBlockTime.Store(block.Header.GetTimestamp().UTC().UnixNano())
}
return nil
}

Expand Down Expand Up @@ -420,13 +409,3 @@ func (m *Manager) freezeNode(err error) {
uevent.MustPublish(m.Ctx, m.Pubsub, &events.DataHealthStatus{Error: err}, events.HealthStatusList)
m.Cancel()
}

// SetLastBlockTimeInSettlementFromHeight is used to initialize LastBlockTimeInSettlement from rollapp height in settlement
func (m *Manager) SetLastBlockTimeInSettlementFromHeight(lastSettlementHeight uint64) {
block, err := m.Store.LoadBlock(lastSettlementHeight)
if err != nil {
// if settlement height block is not found it will be updated after, when syncing
return
}
m.LastBlockTimeInSettlement.Store(block.Header.GetTimestamp().UTC().UnixNano())
}
4 changes: 2 additions & 2 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int)
return nil
case bytesProducedC <- bytesProducedN:
default:
evt := &events.DataHealthStatus{Error: fmt.Errorf("Block production paused. Time between last block produced and last block submitted higher than max skew time: %s last block in settlement time: %s %w", m.Conf.MaxSkewTime, time.Unix(0, m.LastBlockTimeInSettlement.Load()), gerrc.ErrResourceExhausted)}
evt := &events.DataHealthStatus{Error: fmt.Errorf("Block production paused. Time between last block produced and last block submitted higher than max skew time: %s last block in settlement time: %s %w", m.Conf.MaxSkewTime, m.GetLastBlockTimeInSettlement(), gerrc.ErrResourceExhausted)}
uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)
m.logger.Error("Pausing block production until new batch is submitted.", "Batch skew time", m.GetBatchSkewTime(), "Max batch skew time", m.Conf.MaxSkewTime, "Last block in settlement time", time.Unix(0, m.LastBlockTimeInSettlement.Load()))
m.logger.Error("Pausing block production until new batch is submitted.", "Batch skew time", m.GetBatchSkewTime(), "Max batch skew time", m.Conf.MaxSkewTime, "Last block in settlement time", m.GetLastBlockTimeInSettlement())
select {
case <-ctx.Done():
return nil
Expand Down
2 changes: 1 addition & 1 deletion block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (m *Manager) PruningLoop(ctx context.Context) error {
case retainHeight := <-m.pruningC:
var pruningHeight uint64
if m.RunMode == RunModeProposer { // do not delete anything that we might submit in future
pruningHeight = min(m.NextHeightToSubmit(), uint64(retainHeight))
pruningHeight = min(m.LastSettlementHeight.Load(), uint64(retainHeight))
} else { // do not delete anything that is not validated yet
pruningHeight = min(m.SettlementValidator.NextValidationHeight(), uint64(retainHeight))
}
Expand Down
63 changes: 56 additions & 7 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (m *Manager) SubmitLoop(ctx context.Context,
m.GetUnsubmittedBlocks,
m.GetUnsubmittedBytes,
m.GetBatchSkewTime,
m.isLastBatchRecent,
m.Conf.BatchSubmitTime,
m.Conf.BatchSubmitBytes,
m.CreateAndSubmitBatchGetSizeBlocksCommits,
Expand All @@ -48,6 +49,7 @@ func SubmitLoopInner(
unsubmittedBlocksNum func() uint64, // func that returns the amount of non-submitted blocks
unsubmittedBlocksBytes func() int, // func that returns bytes from non-submitted blocks
batchSkewTime func() time.Duration, // func that returns measured time between last submitted block and last produced block
isLastBatchRecent func(time.Duration) bool, // func that returns true if the last batch submission time is more recent than batch submission time
maxBatchSubmitTime time.Duration, // max time to allow between batches
maxBatchSubmitBytes uint64, // max size of serialised batch in bytes
createAndSubmitBatch func(maxSizeBytes uint64) (bytes uint64, err error),
Expand Down Expand Up @@ -86,8 +88,8 @@ func SubmitLoopInner(
})

eg.Go(func() error {
// 'submitter': this thread actually creates and submits batches. this thread is woken up every batch_submit_time (in addition to every block produced) to check if there is anything to submit even if no new blocks have been produced
ticker := time.NewTicker(maxBatchSubmitTime)
// 'submitter': this thread actually creates and submits batches. this thread is woken up every batch_submit_time/10 (we used /10 to avoid waiting too much if submission is not required for t-maxBatchSubmitTime, but it maybe required before t) to check if submission is required even if no new blocks have been produced
ticker := time.NewTicker(maxBatchSubmitTime / 10)
for {
select {
case <-ctx.Done():
Expand All @@ -103,7 +105,7 @@ func SubmitLoopInner(
done := ctx.Err() != nil
nothingToSubmit := pending == 0

lastSubmissionIsRecent := batchSkewTime() < maxBatchSubmitTime
lastSubmissionIsRecent := isLastBatchRecent(maxBatchSubmitTime)
maxDataNotExceeded := pending <= maxBatchSubmitBytes

UpdateBatchSubmissionGauges(pending, unsubmittedBlocksNum(), batchSkewTime())
Expand Down Expand Up @@ -257,7 +259,7 @@ func (m *Manager) SubmitBatch(batch *types.Batch) error {
m.LastSettlementHeight.Store(batch.EndHeight())

// update last submitted block time with batch last block (used to calculate max skew time)
m.LastBlockTimeInSettlement.Store(batch.Blocks[len(batch.Blocks)-1].Header.GetTimestamp().UTC().UnixNano())
m.LastSubmissionTime.Store(time.Now().UTC().UnixNano())

return err
}
Expand Down Expand Up @@ -314,11 +316,58 @@ func (m *Manager) UpdateLastSubmittedHeight(event pubsub.Message) {
}
}

// GetLastBlockTime returns the time of the last produced block
func (m *Manager) GetLastProducedBlockTime() time.Time {
lastProducedBlock, err := m.Store.LoadBlock(m.State.Height())
if err != nil {
return time.Time{}
}
return lastProducedBlock.Header.GetTimestamp()
}

// GetLastBlockTimeInSettlement returns the time of the last submitted block to SL
// If no height in settlement returns first block time
// If no first block produced returns now
// If different error returns empty time
func (m *Manager) GetLastBlockTimeInSettlement() time.Time {
lastBlockInSettlement, err := m.Store.LoadBlock(m.LastSettlementHeight.Load())
if err != nil && !errors.Is(err, gerrc.ErrNotFound) {
return time.Time{}
}
if errors.Is(err, gerrc.ErrNotFound) {
firstBlock, err := m.Store.LoadBlock(uint64(m.Genesis.InitialHeight)) //nolint:gosec // height is non-negative and falls in int64
if errors.Is(err, gerrc.ErrNotFound) {
return time.Now()
}
if err != nil {
return time.Time{}
}
return firstBlock.Header.GetTimestamp()

}
return lastBlockInSettlement.Header.GetTimestamp()
}

// GetBatchSkewTime returns the time between the last produced block and the last block submitted to SL
func (m *Manager) GetBatchSkewTime() time.Duration {
lastProducedTime := time.Unix(0, m.LastBlockTime.Load())
lastSubmittedTime := time.Unix(0, m.LastBlockTimeInSettlement.Load())
return lastProducedTime.Sub(lastSubmittedTime)
return m.GetLastProducedBlockTime().Sub(m.GetLastBlockTimeInSettlement())
}

// isLastBatchRecent returns true if the last batch submitted is more recent than maxBatchSubmitTime
// in case of no submission time the first block produced is used as a reference.
func (m *Manager) isLastBatchRecent(maxBatchSubmitTime time.Duration) bool {
var lastSubmittedTime time.Time
if m.LastSubmissionTime.Load() == 0 {
firstBlock, err := m.Store.LoadBlock(uint64(m.Genesis.InitialHeight)) //nolint:gosec // height is non-negative and falls in int64
if err != nil {
return true
} else {
lastSubmittedTime = firstBlock.Header.GetTimestamp()
}
} else {
lastSubmittedTime = time.Unix(0, m.LastSubmissionTime.Load())
}
return time.Since(lastSubmittedTime) < maxBatchSubmitTime
}

func UpdateBatchSubmissionGauges(skewBytes uint64, skewBlocks uint64, skewTime time.Duration) {
Expand Down
5 changes: 4 additions & 1 deletion block/submit_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,11 @@ func testSubmitLoopInner(
pendingBytes := func() int {
return int(nProducedBytes.Load())
}
isLastBatchRecent := func(time.Duration) bool {
return true
}

block.SubmitLoopInner(ctx, log.NewNopLogger(), producedBytesC, args.batchSkew, accumulatedBlocks, pendingBytes, skewTime, args.maxTime, args.batchBytes, submitBatch)
block.SubmitLoopInner(ctx, log.NewNopLogger(), producedBytesC, args.batchSkew, accumulatedBlocks, pendingBytes, skewTime, isLastBatchRecent, args.maxTime, args.batchBytes, submitBatch)
}

// Make sure the producer does not get too far ahead
Expand Down
4 changes: 2 additions & 2 deletions block/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func TestSubmissionByTime(t *testing.T) {
manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)

manager.LastBlockTimeInSettlement.Store(time.Now().UTC().UnixNano())
manager.LastSubmissionTime.Store(time.Now().UTC().UnixNano())
// Check initial height
initialHeight := uint64(0)
require.Equal(initialHeight, manager.State.Height())
Expand Down Expand Up @@ -341,7 +341,7 @@ func TestSubmissionByBatchSize(t *testing.T) {
managerConfig.BatchSubmitBytes = c.blockBatchMaxSizeBytes
manager, err := testutil.GetManager(managerConfig, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)
manager.LastBlockTimeInSettlement.Store(time.Now().UTC().UnixNano())
manager.LastSubmissionTime.Store(time.Now().UTC().UnixNano())

manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)
Expand Down
4 changes: 2 additions & 2 deletions block/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func (m *Manager) SettlementSyncLoop(ctx context.Context) error {
}
m.logger.Info("Retrieved state update from SL.", "state_index", settlementBatch.StateIndex)

// we update LastBlockTimeInSettlement to be able to measure batch skew time with last block time in settlement
m.LastBlockTimeInSettlement.Store(settlementBatch.BlockDescriptors[len(settlementBatch.BlockDescriptors)-1].GetTimestamp().UTC().UnixNano())
// we update LastSubmissionTime to be able to measure batch submission time
m.LastSubmissionTime.Store(settlementBatch.Batch.CreationTime.UTC().UnixNano())

err = m.ApplyBatchFromSL(settlementBatch.Batch)

Expand Down
1 change: 1 addition & 0 deletions settlement/dymension/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func convertStateInfoToResultRetrieveBatch(stateInfo *rollapptypes.StateInfo) (*
BlockDescriptors: stateInfo.BDs.BD,
NumBlocks: stateInfo.NumBlocks,
NextSequencer: stateInfo.NextProposer,
CreationTime: stateInfo.CreatedAt,
}

return &settlement.ResultRetrieveBatch{
Expand Down
1 change: 1 addition & 0 deletions settlement/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ func (c *Client) convertBatchToSettlementBatch(batch *types.Batch, daResult *da.
},
},
BlockDescriptors: bds,
CreationTime: time.Now(),
}

return settlementBatch
Expand Down
7 changes: 5 additions & 2 deletions settlement/settlement.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package settlement

import (
"time"

"github.com/tendermint/tendermint/libs/pubsub"

"github.com/dymensionxyz/dymint/da"
Expand Down Expand Up @@ -42,8 +44,9 @@ type Batch struct {
NextSequencer string

// MetaData about the batch in the DA layer
MetaData *BatchMetaData
NumBlocks uint64 // FIXME: can be removed. not used and will be deprecated
MetaData *BatchMetaData
NumBlocks uint64 // FIXME: can be removed. not used and will be deprecated
CreationTime time.Time
}

type ResultRetrieveBatch struct {
Expand Down

0 comments on commit e2308e3

Please sign in to comment.