Skip to content

Commit

Permalink
skew time using first block when no state update
Browse files Browse the repository at this point in the history
  • Loading branch information
srene committed Dec 16, 2024
1 parent 2a07ba3 commit 6c8aa77
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 40 deletions.
2 changes: 0 additions & 2 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,6 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta

}

// save last block time used to calculate batch skew time
m.LastBlockTime.Store(block.Header.GetTimestamp().UTC().UnixNano())
// Update the store:
// 1. Save the proposer for the current height to the store.
// 2. Update the proposer in the state in case of rotation.
Expand Down
26 changes: 1 addition & 25 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,6 @@ type Manager struct {
Cancel context.CancelFunc
Ctx context.Context

// LastBlockTimeInSettlement is the time of last submitted block, used to measure batch skew time
LastBlockTimeInSettlement atomic.Int64

// LastBlockTime is the time of last produced block, used to measure batch skew time
LastBlockTime atomic.Int64

// mutex used to avoid stopping node when fork is detected but proposer is creating/sending fork batch
forkMu sync.Mutex
/*
Expand Down Expand Up @@ -316,7 +310,7 @@ func (m *Manager) updateFromLastSettlementState() error {
// The SL hasn't got any batches for this chain yet.
m.logger.Info("No batches for chain found in SL.")
m.LastSettlementHeight.Store(uint64(m.Genesis.InitialHeight - 1)) //nolint:gosec // height is non-negative and falls in int64
m.LastBlockTimeInSettlement.Store(m.Genesis.GenesisTime.UTC().UnixNano())

return nil
}
if err != nil {
Expand All @@ -331,14 +325,6 @@ func (m *Manager) updateFromLastSettlementState() error {

m.LastSettlementHeight.Store(latestHeight)

// init last block in settlement time in dymint state to calculate batch submit skew time
m.SetLastBlockTimeInSettlementFromHeight(latestHeight)

// init last block time in dymint state to calculate batch submit skew time
block, err := m.Store.LoadBlock(m.State.Height())
if err == nil {
m.LastBlockTime.Store(block.Header.GetTimestamp().UTC().UnixNano())
}
return nil
}

Expand Down Expand Up @@ -420,13 +406,3 @@ func (m *Manager) freezeNode(err error) {
uevent.MustPublish(m.Ctx, m.Pubsub, &events.DataHealthStatus{Error: err}, events.HealthStatusList)
m.Cancel()
}

// SetLastBlockTimeInSettlementFromHeight is used to initialize LastBlockTimeInSettlement from rollapp height in settlement
func (m *Manager) SetLastBlockTimeInSettlementFromHeight(lastSettlementHeight uint64) {
block, err := m.Store.LoadBlock(lastSettlementHeight)
if err != nil {
// if settlement height block is not found it will be updated after, when syncing
return
}
m.LastBlockTimeInSettlement.Store(block.Header.GetTimestamp().UTC().UnixNano())
}
4 changes: 2 additions & 2 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int)
return nil
case bytesProducedC <- bytesProducedN:
default:
evt := &events.DataHealthStatus{Error: fmt.Errorf("Block production paused. Time between last block produced and last block submitted higher than max skew time: %s last block in settlement time: %s %w", m.Conf.MaxSkewTime, time.Unix(0, m.LastBlockTimeInSettlement.Load()), gerrc.ErrResourceExhausted)}
evt := &events.DataHealthStatus{Error: fmt.Errorf("Block production paused. Time between last block produced and last block submitted higher than max skew time: %s last block in settlement time: %s %w", m.Conf.MaxSkewTime, m.GetLastBlockInSettlementTime(), gerrc.ErrResourceExhausted)}
uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)
m.logger.Error("Pausing block production until new batch is submitted.", "Batch skew time", m.GetBatchSkewTime(), "Max batch skew time", m.Conf.MaxSkewTime, "Last block in settlement time", time.Unix(0, m.LastBlockTimeInSettlement.Load()))
m.logger.Error("Pausing block production until new batch is submitted.", "Batch skew time", m.GetBatchSkewTime(), "Max batch skew time", m.Conf.MaxSkewTime, "Last block in settlement time", m.GetLastBlockInSettlementTime())
select {
case <-ctx.Done():
return nil
Expand Down
23 changes: 17 additions & 6 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,6 @@ func (m *Manager) SubmitBatch(batch *types.Batch) error {
types.RollappHubHeightGauge.Set(float64(batch.EndHeight()))
m.LastSettlementHeight.Store(batch.EndHeight())

// update last submitted block time with batch last block (used to calculate max skew time)
m.LastBlockTimeInSettlement.Store(batch.Blocks[len(batch.Blocks)-1].Header.GetTimestamp().UTC().UnixNano())

return err
}

Expand Down Expand Up @@ -314,11 +311,25 @@ func (m *Manager) UpdateLastSubmittedHeight(event pubsub.Message) {
}
}

func (m *Manager) GetLastBlockInSettlementTime() time.Time {
lastBlockInSettlement, err := m.Store.LoadBlock(m.LastSettlementHeight.Load())
if err != nil {
firstBlock, err := m.Store.LoadBlock(uint64(m.Genesis.InitialHeight))
if err != nil {
return time.Now()
}
return firstBlock.Header.GetTimestamp()
}
return lastBlockInSettlement.Header.GetTimestamp()
}

// GetBatchSkewTime returns the time between the last produced block and the last block submitted to SL
func (m *Manager) GetBatchSkewTime() time.Duration {
lastProducedTime := time.Unix(0, m.LastBlockTime.Load())
lastSubmittedTime := time.Unix(0, m.LastBlockTimeInSettlement.Load())
return lastProducedTime.Sub(lastSubmittedTime)
lastBlockProduced, err := m.Store.LoadBlock(m.State.Height())
if err != nil {
return 0
}
return lastBlockProduced.Header.GetTimestamp().Sub(m.GetLastBlockInSettlementTime())
}

func UpdateBatchSubmissionGauges(skewBytes uint64, skewBlocks uint64, skewTime time.Duration) {
Expand Down
2 changes: 0 additions & 2 deletions block/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ func TestSubmissionByTime(t *testing.T) {
manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)

manager.LastBlockTimeInSettlement.Store(time.Now().UTC().UnixNano())
// Check initial height
initialHeight := uint64(0)
require.Equal(initialHeight, manager.State.Height())
Expand Down Expand Up @@ -341,7 +340,6 @@ func TestSubmissionByBatchSize(t *testing.T) {
managerConfig.BatchSubmitBytes = c.blockBatchMaxSizeBytes
manager, err := testutil.GetManager(managerConfig, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)
manager.LastBlockTimeInSettlement.Store(time.Now().UTC().UnixNano())

manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)
Expand Down
3 changes: 0 additions & 3 deletions block/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ func (m *Manager) SettlementSyncLoop(ctx context.Context) error {
}
m.logger.Info("Retrieved state update from SL.", "state_index", settlementBatch.StateIndex)

// we update LastBlockTimeInSettlement to be able to measure batch skew time with last block time in settlement
m.LastBlockTimeInSettlement.Store(settlementBatch.BlockDescriptors[len(settlementBatch.BlockDescriptors)-1].GetTimestamp().UTC().UnixNano())

err = m.ApplyBatchFromSL(settlementBatch.Batch)

// this will keep sync loop alive when DA is down or retrievals are failing because DA issues.
Expand Down

0 comments on commit 6c8aa77

Please sign in to comment.