Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

skew time using first block when no state update #1283

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,6 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta

}

// save last block time used to calculate batch skew time
m.LastBlockTime.Store(block.Header.GetTimestamp().UTC().UnixNano())
// Update the store:
// 1. Save the proposer for the current height to the store.
// 2. Update the proposer in the state in case of rotation.
Expand Down
26 changes: 1 addition & 25 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,6 @@ type Manager struct {
Cancel context.CancelFunc
Ctx context.Context

// LastBlockTimeInSettlement is the time of last submitted block, used to measure batch skew time
LastBlockTimeInSettlement atomic.Int64

// LastBlockTime is the time of last produced block, used to measure batch skew time
LastBlockTime atomic.Int64

// mutex used to avoid stopping node when fork is detected but proposer is creating/sending fork batch
forkMu sync.Mutex
/*
Expand Down Expand Up @@ -316,7 +310,7 @@ func (m *Manager) updateFromLastSettlementState() error {
// The SL hasn't got any batches for this chain yet.
m.logger.Info("No batches for chain found in SL.")
m.LastSettlementHeight.Store(uint64(m.Genesis.InitialHeight - 1)) //nolint:gosec // height is non-negative and falls in int64
m.LastBlockTimeInSettlement.Store(m.Genesis.GenesisTime.UTC().UnixNano())

return nil
}
if err != nil {
Expand All @@ -331,14 +325,6 @@ func (m *Manager) updateFromLastSettlementState() error {

m.LastSettlementHeight.Store(latestHeight)

// init last block in settlement time in dymint state to calculate batch submit skew time
m.SetLastBlockTimeInSettlementFromHeight(latestHeight)

// init last block time in dymint state to calculate batch submit skew time
block, err := m.Store.LoadBlock(m.State.Height())
if err == nil {
m.LastBlockTime.Store(block.Header.GetTimestamp().UTC().UnixNano())
}
return nil
}

Expand Down Expand Up @@ -420,13 +406,3 @@ func (m *Manager) freezeNode(err error) {
uevent.MustPublish(m.Ctx, m.Pubsub, &events.DataHealthStatus{Error: err}, events.HealthStatusList)
m.Cancel()
}

// SetLastBlockTimeInSettlementFromHeight is used to initialize LastBlockTimeInSettlement from rollapp height in settlement
func (m *Manager) SetLastBlockTimeInSettlementFromHeight(lastSettlementHeight uint64) {
block, err := m.Store.LoadBlock(lastSettlementHeight)
if err != nil {
// if settlement height block is not found it will be updated after, when syncing
return
}
m.LastBlockTimeInSettlement.Store(block.Header.GetTimestamp().UTC().UnixNano())
}
8 changes: 6 additions & 2 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,13 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int)
return nil
case bytesProducedC <- bytesProducedN:
default:
evt := &events.DataHealthStatus{Error: fmt.Errorf("Block production paused. Time between last block produced and last block submitted higher than max skew time: %s last block in settlement time: %s %w", m.Conf.MaxSkewTime, time.Unix(0, m.LastBlockTimeInSettlement.Load()), gerrc.ErrResourceExhausted)}
lastBlockTimeInSettlement, err := m.GetLastBlockInSettlementTime()
if err != nil {
m.logger.Error("Unable to get last block in settlement time")
}
evt := &events.DataHealthStatus{Error: fmt.Errorf("Block production paused. Time between last block produced and last block submitted higher than max skew time: %s last block in settlement time: %s %w", m.Conf.MaxSkewTime, lastBlockTimeInSettlement, gerrc.ErrResourceExhausted)}
uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)
m.logger.Error("Pausing block production until new batch is submitted.", "Batch skew time", m.GetBatchSkewTime(), "Max batch skew time", m.Conf.MaxSkewTime, "Last block in settlement time", time.Unix(0, m.LastBlockTimeInSettlement.Load()))
m.logger.Error("Pausing block production until new batch is submitted.", "Batch skew time", m.GetBatchSkewTime(), "Max batch skew time", m.Conf.MaxSkewTime, "Last block in settlement time", lastBlockTimeInSettlement)
select {
case <-ctx.Done():
return nil
Expand Down
34 changes: 28 additions & 6 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,6 @@ func (m *Manager) SubmitBatch(batch *types.Batch) error {
types.RollappHubHeightGauge.Set(float64(batch.EndHeight()))
m.LastSettlementHeight.Store(batch.EndHeight())

// update last submitted block time with batch last block (used to calculate max skew time)
m.LastBlockTimeInSettlement.Store(batch.Blocks[len(batch.Blocks)-1].Header.GetTimestamp().UTC().UnixNano())

return err
}

Expand Down Expand Up @@ -314,11 +311,36 @@ func (m *Manager) UpdateLastSubmittedHeight(event pubsub.Message) {
}
}

func (m *Manager) GetLastBlockInSettlementTime() (time.Time, error) {
lastBlockInSettlement, err := m.Store.LoadBlock(m.LastSettlementHeight.Load())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the LastSettlementHeight may get pruned no?

pruningHeight = min(m.NextHeightToSubmit(), uint64(retainHeight))

if that's the case and we fallback to first block it may cause unnecessary halt.

if err != nil && !errors.Is(err, gerrc.ErrNotFound) {
return time.Time{}, err
}
if errors.Is(err, gerrc.ErrNotFound) {
firstBlock, err := m.Store.LoadBlock(uint64(m.Genesis.InitialHeight))
if err != nil && !errors.Is(err, gerrc.ErrNotFound) {
return time.Time{}, err
}
if errors.Is(err, gerrc.ErrNotFound) {
return time.Now(), nil
}
return firstBlock.Header.GetTimestamp(), nil

}
return lastBlockInSettlement.Header.GetTimestamp(), nil
}

// GetBatchSkewTime returns the time between the last produced block and the last block submitted to SL
func (m *Manager) GetBatchSkewTime() time.Duration {
lastProducedTime := time.Unix(0, m.LastBlockTime.Load())
lastSubmittedTime := time.Unix(0, m.LastBlockTimeInSettlement.Load())
return lastProducedTime.Sub(lastSubmittedTime)
lastBlockProduced, err := m.Store.LoadBlock(m.State.Height())
if err != nil {
return 0
}
lastBlockInSettlementTime, err := m.GetLastBlockInSettlementTime()
if err != nil {
return 0
}
return lastBlockProduced.Header.GetTimestamp().Sub(lastBlockInSettlementTime)
}

func UpdateBatchSubmissionGauges(skewBytes uint64, skewBlocks uint64, skewTime time.Duration) {
Expand Down
2 changes: 0 additions & 2 deletions block/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ func TestSubmissionByTime(t *testing.T) {
manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)

manager.LastBlockTimeInSettlement.Store(time.Now().UTC().UnixNano())
// Check initial height
initialHeight := uint64(0)
require.Equal(initialHeight, manager.State.Height())
Expand Down Expand Up @@ -341,7 +340,6 @@ func TestSubmissionByBatchSize(t *testing.T) {
managerConfig.BatchSubmitBytes = c.blockBatchMaxSizeBytes
manager, err := testutil.GetManager(managerConfig, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)
manager.LastBlockTimeInSettlement.Store(time.Now().UTC().UnixNano())

manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)
Expand Down
3 changes: 0 additions & 3 deletions block/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ func (m *Manager) SettlementSyncLoop(ctx context.Context) error {
}
m.logger.Info("Retrieved state update from SL.", "state_index", settlementBatch.StateIndex)

// we update LastBlockTimeInSettlement to be able to measure batch skew time with last block time in settlement
m.LastBlockTimeInSettlement.Store(settlementBatch.BlockDescriptors[len(settlementBatch.BlockDescriptors)-1].GetTimestamp().UTC().UnixNano())

err = m.ApplyBatchFromSL(settlementBatch.Batch)

// this will keep sync loop alive when DA is down or retrievals are failing because DA issues.
Expand Down
Loading