Skip to content

Commit

Permalink
feat(manager): max skew based on time instead of batches (#1140)
Browse files Browse the repository at this point in the history
  • Loading branch information
srene authored Nov 21, 2024
1 parent f503231 commit 34018b9
Show file tree
Hide file tree
Showing 29 changed files with 323 additions and 193 deletions.
4 changes: 3 additions & 1 deletion block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,14 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
m.logger.Debug("pruning channel full. skipping pruning", "retainHeight", retainHeight)
}
}

// Update the state with the new app hash, and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height, block.Header.Hash())

}

// save last block time used to calculate batch skew time
m.LastBlockTime.Store(block.Header.GetTimestamp().UTC().UnixNano())
// Update the store:
// 1. Save the proposer for the current height to the store.
// 2. Update the proposer in the state in case of rotation.
Expand Down
1 change: 0 additions & 1 deletion block/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ func (e *Executor) CreateBlock(
copy(block.Header.DataHash[:], types.GetDataHash(block))
copy(block.Header.SequencerHash[:], state.GetProposerHash())
copy(block.Header.NextSequencersHash[:], nextSeqHash[:])

return block
}

Expand Down
2 changes: 2 additions & 0 deletions block/initchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ func (m *Manager) RunInitChain(ctx context.Context) error {
if err != nil {
return err
}

// update the state with only the consensus pubkey
m.Executor.UpdateStateAfterInitChain(m.State, res)
m.Executor.UpdateMempoolAfterInitChain(m.State)
if _, err := m.Store.SaveState(m.State, nil); err != nil {
return err
}

return nil
}
30 changes: 27 additions & 3 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ type Manager struct {
// context used when freezing node
Cancel context.CancelFunc
Ctx context.Context

// LastBlockTimeInSettlement is the time of last submitted block, used to measure batch skew time
LastBlockTimeInSettlement atomic.Int64

// LastBlockTime is the time of last produced block, used to measure batch skew time
LastBlockTime atomic.Int64
/*
Sequencer and full-node
*/
Expand Down Expand Up @@ -303,20 +309,28 @@ func (m *Manager) updateFromLastSettlementState() error {
// The SL hasn't got any batches for this chain yet.
m.logger.Info("No batches for chain found in SL.")
m.LastSettlementHeight.Store(uint64(m.Genesis.InitialHeight - 1))
m.LastBlockTimeInSettlement.Store(m.Genesis.GenesisTime.UTC().UnixNano())
return nil
}

if err != nil {
// TODO: separate between fresh rollapp and non-registered rollapp
return err
}

m.LastSettlementHeight.Store(latestHeight)

if latestHeight >= m.State.NextHeight() {
m.UpdateTargetHeight(latestHeight)
}

m.LastSettlementHeight.Store(latestHeight)

// init last block in settlement time in dymint state to calculate batch submit skew time
m.SetLastBlockTimeInSettlementFromHeight(latestHeight)

// init last block time in dymint state to calculate batch submit skew time
block, err := m.Store.LoadBlock(m.State.Height())
if err == nil {
m.LastBlockTime.Store(block.Header.GetTimestamp().UTC().UnixNano())
}
return nil
}

Expand Down Expand Up @@ -402,3 +416,13 @@ func (m *Manager) freezeNode(err error) {
uevent.MustPublish(m.Ctx, m.Pubsub, &events.DataHealthStatus{Error: err}, events.HealthStatusList)
m.Cancel()
}

// SetLastBlockTimeInSettlementFromHeight is used to initialize LastBlockTimeInSettlement from rollapp height in settlement
func (m *Manager) SetLastBlockTimeInSettlementFromHeight(lastSettlementHeight uint64) {
block, err := m.Store.LoadBlock(lastSettlementHeight)
if err != nil {
// if settlement height block is not found it will be updated after, when syncing
return
}
m.LastBlockTimeInSettlement.Store(block.Header.GetTimestamp().UTC().UnixNano())
}
2 changes: 1 addition & 1 deletion block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
assert.True(t, manager.State.Height() == 0)

// enough time to sync and produce blocks
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
defer cancel()
// Capture the error returned by manager.Start.

Expand Down
7 changes: 3 additions & 4 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,15 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int)
}

bytesProducedN := block.SizeBytes() + commit.SizeBytes()
m.logger.Info("New block.", "size", uint64(block.ToProto().Size()))

select {
case <-ctx.Done():
return nil
case bytesProducedC <- bytesProducedN:
default:
evt := &events.DataHealthStatus{Error: fmt.Errorf("bytes produced channel is full: %w", gerrc.ErrResourceExhausted)}
uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)
m.logger.Error("Enough bytes to build a batch have been accumulated, but too many batches are pending submission. " +
"Pausing block production until a signal is consumed.")
m.logger.Error("Pausing block production until new batch is submitted.", "Batch skew time", m.GetBatchSkewTime(), "Max batch skew time", m.Conf.MaxSkewTime)
select {
case <-ctx.Done():
return nil
Expand Down Expand Up @@ -184,7 +183,7 @@ func (m *Manager) produceBlock(opts ProduceBlockOptions) (*types.Block, *types.C
return nil, nil, fmt.Errorf("create commit: %w: %w", err, ErrNonRecoverable)
}

m.logger.Info("Block created.", "height", newHeight, "num_tx", len(block.Data.Txs))
m.logger.Info("Block created.", "height", newHeight, "num_tx", len(block.Data.Txs), "size", block.SizeBytes()+commit.SizeBytes())
types.RollappBlockSizeBytesGauge.Set(float64(len(block.Data.Txs)))
types.RollappBlockSizeTxsGauge.Set(float64(len(block.Data.Txs)))
return block, commit, nil
Expand Down
1 change: 0 additions & 1 deletion block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ func (e *Executor) UpdateStateAfterCommit(s *types.State, resp *tmstate.ABCIResp
copy(s.LastHeaderHash[:], lastHeaderHash[:])

s.SetHeight(height)

if resp.EndBlock.ConsensusParamUpdates != nil {
s.ConsensusParams.Block.MaxGas = resp.EndBlock.ConsensusParamUpdates.Block.MaxGas
s.ConsensusParams.Block.MaxBytes = resp.EndBlock.ConsensusParamUpdates.Block.MaxBytes
Expand Down
81 changes: 49 additions & 32 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/types"
uatomic "github.com/dymensionxyz/dymint/utils/atomic"
uchannel "github.com/dymensionxyz/dymint/utils/channel"
)

Expand All @@ -30,8 +29,10 @@ func (m *Manager) SubmitLoop(ctx context.Context,
ctx,
m.logger,
bytesProduced,
m.Conf.BatchSkew,
m.Conf.MaxSkewTime,
m.GetUnsubmittedBlocks,
m.GetUnsubmittedBytes,
m.GetBatchSkewTime,
m.Conf.BatchSubmitTime,
m.Conf.BatchSubmitBytes,
m.CreateAndSubmitBatchGetSizeBlocksCommits,
Expand All @@ -43,11 +44,13 @@ func SubmitLoopInner(
ctx context.Context,
logger types.Logger,
bytesProduced chan int, // a channel of block and commit bytes produced
maxBatchSkew uint64, // max number of blocks that submitter is allowed to have pending
unsubmittedBlocks func() uint64,
maxBatchTime time.Duration, // max time to allow between batches
maxBatchBytes uint64, // max size of serialised batch in bytes
createAndSubmitBatch func(maxSizeBytes uint64) (sizeBlocksCommits uint64, err error),
maxProduceSubmitSkewTime time.Duration, // max time between last submitted block and last produced block allowed. if this threshold is reached block production is stopped.
unsubmittedBlocksNum func() uint64,
unsubmittedBlocksBytes func() int,
batchSkewTime func() time.Duration,
maxBatchSubmitTime time.Duration, // max time to allow between batches
maxBatchSubmitBytes uint64, // max size of serialised batch in bytes
createAndSubmitBatch func(maxSizeBytes uint64) (bytes uint64, err error),
) error {
eg, ctx := errgroup.WithContext(ctx)

Expand All @@ -60,34 +63,32 @@ func SubmitLoopInner(
// 'trigger': this thread is responsible for waking up the submitter when a new block arrives, and back-pressures the block production loop
// if it gets too far ahead.
for {
if maxBatchSkew*maxBatchBytes < pendingBytes.Load() {
select {
case <-ctx.Done():
return ctx.Err()
case n := <-bytesProduced:
pendingBytes.Add(uint64(n))
logger.Debug("Added bytes produced to bytes pending submission counter.", "bytes added", n, "pending", pendingBytes.Load())
}

submitter.Nudge()

if maxProduceSubmitSkewTime < batchSkewTime() {
// too much stuff is pending submission
// we block here until we get a progress nudge from the submitter thread
select {
case <-ctx.Done():
return ctx.Err()
case <-trigger.C:
}
} else {
select {
case <-ctx.Done():
return ctx.Err()
case n := <-bytesProduced:
pendingBytes.Add(uint64(n))
logger.Debug("Added bytes produced to bytes pending submission counter.", "bytes added", n, "pending", pendingBytes.Load())
}
}

types.RollappPendingSubmissionsSkewBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocks()))
submitter.Nudge()
}
})

eg.Go(func() error {
// 'submitter': this thread actually creates and submits batches, and will do it on a timer if he isn't nudged by block production
timeLastSubmission := time.Now()
ticker := time.NewTicker(maxBatchTime / 10) // interval does not need to match max batch time since we keep track anyway, it's just to wakeup
ticker := time.NewTicker(maxBatchSubmitTime / 10) // interval does not need to match max batch time since we keep track anyway, it's just to wakeup
for {
select {
case <-ctx.Done():
Expand All @@ -97,21 +98,22 @@ func SubmitLoopInner(
}

pending := pendingBytes.Load()
types.RollappPendingSubmissionsSkewBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocks()))
types.RollappPendingSubmissionsSkewBatches.Set(float64(pendingBytes.Load() / maxBatchBytes))

// while there are accumulated blocks, create and submit batches!!
for {
done := ctx.Err() != nil
nothingToSubmit := pending == 0
lastSubmissionIsRecent := time.Since(timeLastSubmission) < maxBatchTime
maxDataNotExceeded := pending <= maxBatchBytes

lastSubmissionIsRecent := batchSkewTime() < maxBatchSubmitTime
maxDataNotExceeded := pending <= maxBatchSubmitBytes

UpdateBatchSubmissionGauges(pending, unsubmittedBlocksNum(), batchSkewTime())

if done || nothingToSubmit || (lastSubmissionIsRecent && maxDataNotExceeded) {
break
}

nConsumed, err := createAndSubmitBatch(min(pending, maxBatchBytes))
nConsumed, err := createAndSubmitBatch(maxBatchSubmitBytes)
if err != nil {
err = fmt.Errorf("create and submit batch: %w", err)
if errors.Is(err, gerrc.ErrInternal) {
Expand All @@ -126,9 +128,8 @@ func SubmitLoopInner(
}
return err
}
timeLastSubmission = time.Now()
ticker.Reset(maxBatchTime)
pending = uatomic.Uint64Sub(&pendingBytes, nConsumed)
ticker.Reset(maxBatchSubmitTime)
pending = uint64(unsubmittedBlocksBytes())
logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending) // TODO: debug level
}
trigger.Nudge()
Expand Down Expand Up @@ -219,7 +220,7 @@ func (m *Manager) CreateBatch(maxBatchSize uint64, startHeight uint64, endHeight
batch.DRSVersion = batch.DRSVersion[:len(batch.DRSVersion)-1]

if h == startHeight {
return nil, fmt.Errorf("block size exceeds max batch size: h %d: size: %d: %w", h, totalSize, gerrc.ErrOutOfRange)
return nil, fmt.Errorf("block size exceeds max batch size: h %d: batch size: %d: max size: %d err:%w", h, totalSize, maxBatchSize, gerrc.ErrOutOfRange)
}
break
}
Expand All @@ -246,7 +247,10 @@ func (m *Manager) SubmitBatch(batch *types.Batch) error {
types.RollappHubHeightGauge.Set(float64(batch.EndHeight()))
m.LastSettlementHeight.Store(batch.EndHeight())

return nil
// update last submitted block time with batch last block (used to calculate max skew time)
m.LastBlockTimeInSettlement.Store(batch.Blocks[len(batch.Blocks)-1].Header.GetTimestamp().UTC().UnixNano())

return err
}

// GetUnsubmittedBytes returns the total number of unsubmitted bytes produced an element on a channel
Expand Down Expand Up @@ -300,3 +304,16 @@ func (m *Manager) UpdateLastSubmittedHeight(event pubsub.Message) {
}
}
}

// GetBatchSkewTime returns the time between the last produced block and the last block submitted to SL
func (m *Manager) GetBatchSkewTime() time.Duration {
lastProducedTime := time.Unix(0, m.LastBlockTime.Load())
lastSubmittedTime := time.Unix(0, m.LastBlockTimeInSettlement.Load())
return lastProducedTime.Sub(lastSubmittedTime)
}

func UpdateBatchSubmissionGauges(skewBytes uint64, skewBlocks uint64, skewTime time.Duration) {
types.RollappPendingSubmissionsSkewBytes.Set(float64(skewBytes))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(skewBlocks))
types.RollappPendingSubmissionsSkewTimeMinutes.Set(float64(skewTime.Minutes()))
}
Loading

0 comments on commit 34018b9

Please sign in to comment.