Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(manager): max skew based on time instead of batches #1140

Merged
merged 57 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
2f66f5e
skew based on time
srene Oct 15, 2024
3e92814
test fix
srene Oct 15, 2024
8f7c678
new skey claculatuin
srene Oct 16, 2024
72f014a
fixing test + bds in mock
srene Oct 16, 2024
02f5ab7
lint fix
srene Oct 16, 2024
306f6e4
test adapted
srene Oct 16, 2024
a48066b
fix + test
srene Oct 16, 2024
58adc23
waiting for batch submission fix
srene Oct 17, 2024
d40e330
avoid waiting too much on startup
srene Oct 17, 2024
e169eda
fixing race conditions
srene Oct 17, 2024
0034ec8
lint fix
srene Oct 17, 2024
c7168c9
update last submitted block time
srene Oct 17, 2024
f37498b
test fix
srene Oct 18, 2024
80fcaf0
comment added
srene Oct 18, 2024
5667a91
test update
srene Oct 18, 2024
7e28f95
merge fix
srene Nov 4, 2024
349ee0e
fix after merge
srene Nov 4, 2024
d9dcfbd
store last settlement block time on store
srene Nov 4, 2024
10af08c
remove redundant call
srene Nov 4, 2024
0892543
move code to submit file
srene Nov 4, 2024
2bb1cf1
wip
srene Nov 5, 2024
d61d19a
skew fix
srene Nov 5, 2024
0169c1d
fix test
srene Nov 5, 2024
3544858
lint fix
srene Nov 5, 2024
d4d1037
var rename
srene Nov 5, 2024
723b243
test
srene Nov 5, 2024
d3cf946
test
srene Nov 5, 2024
c091f71
remove prints
srene Nov 6, 2024
cb3c54a
test update
srene Nov 6, 2024
e5b5a99
merge fix
srene Nov 8, 2024
b79248a
main merge
srene Nov 19, 2024
ad4bf06
fix after merge
srene Nov 19, 2024
7cf3883
addressing comments
srene Nov 19, 2024
a6b23de
rdk bump
srene Nov 19, 2024
247bc27
fix
srene Nov 19, 2024
8b04b99
lint fix
srene Nov 19, 2024
c8d0078
fix test
srene Nov 19, 2024
c190fe6
last block time fix
srene Nov 19, 2024
113fec4
lint fix
srene Nov 19, 2024
8b284c3
log removed
srene Nov 19, 2024
f227659
addressing comments
srene Nov 19, 2024
caf8f5c
move update block time
srene Nov 19, 2024
fac407c
param rename
srene Nov 19, 2024
3f87777
lint fix
srene Nov 19, 2024
fe1af66
wip
srene Nov 19, 2024
e703849
time to atomic int
srene Nov 19, 2024
373c2cb
fix race cond
srene Nov 19, 2024
764fc02
fix test
srene Nov 19, 2024
4405eb5
merge
srene Nov 19, 2024
3dd3e9a
merge fix
srene Nov 19, 2024
cf34896
merge fix
srene Nov 19, 2024
86bbc31
test fix
srene Nov 20, 2024
4e9dc27
improving logs
srene Nov 20, 2024
e28cec4
improving comments
srene Nov 20, 2024
87d785d
comments
srene Nov 21, 2024
e4de0e6
fix
srene Nov 21, 2024
e398d25
lint fix
srene Nov 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,14 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
m.logger.Debug("pruning channel full. skipping pruning", "retainHeight", retainHeight)
}
}

// Update the state with the new app hash, and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height, block.Header.Hash())

}

// save last block time used to calculate batch skew time
m.LastBlockTime.Store(block.Header.GetTimestamp().UTC().UnixNano())
// Update the store:
// 1. Save the proposer for the current height to the store.
// 2. Update the proposer in the state in case of rotation.
Expand Down
1 change: 0 additions & 1 deletion block/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ func (e *Executor) CreateBlock(
copy(block.Header.DataHash[:], types.GetDataHash(block))
copy(block.Header.SequencerHash[:], state.GetProposerHash())
copy(block.Header.NextSequencersHash[:], nextSeqHash[:])

return block
}

Expand Down
2 changes: 2 additions & 0 deletions block/initchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ func (m *Manager) RunInitChain(ctx context.Context) error {
if err != nil {
return err
}

// update the state with only the consensus pubkey
m.Executor.UpdateStateAfterInitChain(m.State, res)
m.Executor.UpdateMempoolAfterInitChain(m.State)
if _, err := m.Store.SaveState(m.State, nil); err != nil {
return err
}

return nil
}
30 changes: 27 additions & 3 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ type Manager struct {
// context used when freezing node
Cancel context.CancelFunc
Ctx context.Context

// LastBlockTimeInSettlement is the time of last submitted block, used to measure batch skew time
LastBlockTimeInSettlement atomic.Int64

// LastBlockTime is the time of last produced block, used to measure batch skew time
LastBlockTime atomic.Int64
/*
Sequencer and full-node
*/
Expand Down Expand Up @@ -303,20 +309,28 @@ func (m *Manager) updateFromLastSettlementState() error {
// The SL hasn't got any batches for this chain yet.
m.logger.Info("No batches for chain found in SL.")
m.LastSettlementHeight.Store(uint64(m.Genesis.InitialHeight - 1))
m.LastBlockTimeInSettlement.Store(m.Genesis.GenesisTime.UTC().UnixNano())
return nil
}

if err != nil {
// TODO: separate between fresh rollapp and non-registered rollapp
return err
}

m.LastSettlementHeight.Store(latestHeight)

if latestHeight >= m.State.NextHeight() {
m.UpdateTargetHeight(latestHeight)
}

m.LastSettlementHeight.Store(latestHeight)

// init last block in settlement time in dymint state to calculate batch submit skew time
m.SetLastBlockTimeInSettlementFromHeight(latestHeight)

// init last block time in dymint state to calculate batch submit skew time
block, err := m.Store.LoadBlock(m.State.Height())
if err == nil {
m.LastBlockTime.Store(block.Header.GetTimestamp().UTC().UnixNano())
}
return nil
}

Expand Down Expand Up @@ -402,3 +416,13 @@ func (m *Manager) freezeNode(err error) {
uevent.MustPublish(m.Ctx, m.Pubsub, &events.DataHealthStatus{Error: err}, events.HealthStatusList)
m.Cancel()
}

// SetLastBlockTimeInSettlementFromHeight is used to initialize LastBlockTimeInSettlement from rollapp height in settlement
func (m *Manager) SetLastBlockTimeInSettlementFromHeight(lastSettlementHeight uint64) {
block, err := m.Store.LoadBlock(lastSettlementHeight)
if err != nil {
// if settlement height block is not found it will be updated after, when syncing
return
}
m.LastBlockTimeInSettlement.Store(block.Header.GetTimestamp().UTC().UnixNano())
}
2 changes: 1 addition & 1 deletion block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
assert.True(t, manager.State.Height() == 0)

// enough time to sync and produce blocks
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
defer cancel()
// Capture the error returned by manager.Start.

Expand Down
7 changes: 3 additions & 4 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,15 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int)
}

bytesProducedN := block.SizeBytes() + commit.SizeBytes()
m.logger.Info("New block.", "size", uint64(block.ToProto().Size()))

select {
case <-ctx.Done():
return nil
case bytesProducedC <- bytesProducedN:
default:
evt := &events.DataHealthStatus{Error: fmt.Errorf("bytes produced channel is full: %w", gerrc.ErrResourceExhausted)}
uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)
m.logger.Error("Enough bytes to build a batch have been accumulated, but too many batches are pending submission. " +
"Pausing block production until a signal is consumed.")
m.logger.Error("Pausing block production until new batch is submitted.", "Batch skew time", m.GetBatchSkewTime(), "Max batch skew time", m.Conf.MaxSkewTime)
select {
case <-ctx.Done():
return nil
Expand Down Expand Up @@ -184,7 +183,7 @@ func (m *Manager) produceBlock(opts ProduceBlockOptions) (*types.Block, *types.C
return nil, nil, fmt.Errorf("create commit: %w: %w", err, ErrNonRecoverable)
}

m.logger.Info("Block created.", "height", newHeight, "num_tx", len(block.Data.Txs))
m.logger.Info("Block created.", "height", newHeight, "num_tx", len(block.Data.Txs), "size", block.SizeBytes()+commit.SizeBytes())
types.RollappBlockSizeBytesGauge.Set(float64(len(block.Data.Txs)))
types.RollappBlockSizeTxsGauge.Set(float64(len(block.Data.Txs)))
return block, commit, nil
Expand Down
1 change: 0 additions & 1 deletion block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ func (e *Executor) UpdateStateAfterCommit(s *types.State, resp *tmstate.ABCIResp
copy(s.LastHeaderHash[:], lastHeaderHash[:])

s.SetHeight(height)

if resp.EndBlock.ConsensusParamUpdates != nil {
s.ConsensusParams.Block.MaxGas = resp.EndBlock.ConsensusParamUpdates.Block.MaxGas
s.ConsensusParams.Block.MaxBytes = resp.EndBlock.ConsensusParamUpdates.Block.MaxBytes
Expand Down
81 changes: 49 additions & 32 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/types"
uatomic "github.com/dymensionxyz/dymint/utils/atomic"
uchannel "github.com/dymensionxyz/dymint/utils/channel"
)

Expand All @@ -30,8 +29,10 @@ func (m *Manager) SubmitLoop(ctx context.Context,
ctx,
m.logger,
bytesProduced,
m.Conf.BatchSkew,
m.Conf.MaxSkewTime,
m.GetUnsubmittedBlocks,
m.GetUnsubmittedBytes,
m.GetBatchSkewTime,
m.Conf.BatchSubmitTime,
m.Conf.BatchSubmitBytes,
m.CreateAndSubmitBatchGetSizeBlocksCommits,
Expand All @@ -43,11 +44,13 @@ func SubmitLoopInner(
ctx context.Context,
logger types.Logger,
bytesProduced chan int, // a channel of block and commit bytes produced
maxBatchSkew uint64, // max number of blocks that submitter is allowed to have pending
unsubmittedBlocks func() uint64,
maxBatchTime time.Duration, // max time to allow between batches
maxBatchBytes uint64, // max size of serialised batch in bytes
createAndSubmitBatch func(maxSizeBytes uint64) (sizeBlocksCommits uint64, err error),
maxProduceSubmitSkewTime time.Duration, // max time between last submitted block and last produced block allowed. if this threshold is reached block production is stopped.
unsubmittedBlocksNum func() uint64,
unsubmittedBlocksBytes func() int,
batchSkewTime func() time.Duration,
maxBatchSubmitTime time.Duration, // max time to allow between batches
maxBatchSubmitBytes uint64, // max size of serialised batch in bytes
createAndSubmitBatch func(maxSizeBytes uint64) (bytes uint64, err error),
) error {
eg, ctx := errgroup.WithContext(ctx)

Expand All @@ -60,34 +63,32 @@ func SubmitLoopInner(
// 'trigger': this thread is responsible for waking up the submitter when a new block arrives, and back-pressures the block production loop
// if it gets too far ahead.
for {
if maxBatchSkew*maxBatchBytes < pendingBytes.Load() {
select {
case <-ctx.Done():
return ctx.Err()
case n := <-bytesProduced:
pendingBytes.Add(uint64(n))
logger.Debug("Added bytes produced to bytes pending submission counter.", "bytes added", n, "pending", pendingBytes.Load())
}

submitter.Nudge()

if maxProduceSubmitSkewTime < batchSkewTime() {
// too much stuff is pending submission
// we block here until we get a progress nudge from the submitter thread
select {
case <-ctx.Done():
return ctx.Err()
case <-trigger.C:
}
} else {
select {
case <-ctx.Done():
return ctx.Err()
case n := <-bytesProduced:
pendingBytes.Add(uint64(n))
logger.Debug("Added bytes produced to bytes pending submission counter.", "bytes added", n, "pending", pendingBytes.Load())
}
}

types.RollappPendingSubmissionsSkewBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocks()))
submitter.Nudge()
}
})

eg.Go(func() error {
// 'submitter': this thread actually creates and submits batches, and will do it on a timer if he isn't nudged by block production
timeLastSubmission := time.Now()
ticker := time.NewTicker(maxBatchTime / 10) // interval does not need to match max batch time since we keep track anyway, it's just to wakeup
ticker := time.NewTicker(maxBatchSubmitTime / 10) // interval does not need to match max batch time since we keep track anyway, it's just to wakeup
for {
select {
case <-ctx.Done():
Expand All @@ -97,21 +98,22 @@ func SubmitLoopInner(
}

pending := pendingBytes.Load()
types.RollappPendingSubmissionsSkewBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocks()))
types.RollappPendingSubmissionsSkewBatches.Set(float64(pendingBytes.Load() / maxBatchBytes))

// while there are accumulated blocks, create and submit batches!!
for {
done := ctx.Err() != nil
nothingToSubmit := pending == 0
lastSubmissionIsRecent := time.Since(timeLastSubmission) < maxBatchTime
maxDataNotExceeded := pending <= maxBatchBytes

lastSubmissionIsRecent := batchSkewTime() < maxBatchSubmitTime
maxDataNotExceeded := pending <= maxBatchSubmitBytes

UpdateBatchSubmissionGauges(pending, unsubmittedBlocksNum(), batchSkewTime())

if done || nothingToSubmit || (lastSubmissionIsRecent && maxDataNotExceeded) {
break
}

nConsumed, err := createAndSubmitBatch(min(pending, maxBatchBytes))
nConsumed, err := createAndSubmitBatch(maxBatchSubmitBytes)
if err != nil {
err = fmt.Errorf("create and submit batch: %w", err)
if errors.Is(err, gerrc.ErrInternal) {
Expand All @@ -126,9 +128,8 @@ func SubmitLoopInner(
}
return err
}
timeLastSubmission = time.Now()
ticker.Reset(maxBatchTime)
pending = uatomic.Uint64Sub(&pendingBytes, nConsumed)
ticker.Reset(maxBatchSubmitTime)
pending = uint64(unsubmittedBlocksBytes())
logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending) // TODO: debug level
}
trigger.Nudge()
Expand Down Expand Up @@ -219,7 +220,7 @@ func (m *Manager) CreateBatch(maxBatchSize uint64, startHeight uint64, endHeight
batch.DRSVersion = batch.DRSVersion[:len(batch.DRSVersion)-1]

if h == startHeight {
return nil, fmt.Errorf("block size exceeds max batch size: h %d: size: %d: %w", h, totalSize, gerrc.ErrOutOfRange)
return nil, fmt.Errorf("block size exceeds max batch size: h %d: batch size: %d: max size: %d err:%w", h, totalSize, maxBatchSize, gerrc.ErrOutOfRange)
}
break
}
Expand All @@ -246,7 +247,10 @@ func (m *Manager) SubmitBatch(batch *types.Batch) error {
types.RollappHubHeightGauge.Set(float64(batch.EndHeight()))
m.LastSettlementHeight.Store(batch.EndHeight())

return nil
// update last submitted block time with batch last block (used to calculate max skew time)
m.LastBlockTimeInSettlement.Store(batch.Blocks[len(batch.Blocks)-1].Header.GetTimestamp().UTC().UnixNano())

return err
}

// GetUnsubmittedBytes returns the total number of unsubmitted bytes produced an element on a channel
Expand Down Expand Up @@ -300,3 +304,16 @@ func (m *Manager) UpdateLastSubmittedHeight(event pubsub.Message) {
}
}
}

// GetBatchSkewTime returns the time between the last produced block and the last block submitted to SL
func (m *Manager) GetBatchSkewTime() time.Duration {
lastProducedTime := time.Unix(0, m.LastBlockTime.Load())
lastSubmittedTime := time.Unix(0, m.LastBlockTimeInSettlement.Load())
return lastProducedTime.Sub(lastSubmittedTime)
}

func UpdateBatchSubmissionGauges(skewBytes uint64, skewBlocks uint64, skewTime time.Duration) {
types.RollappPendingSubmissionsSkewBytes.Set(float64(skewBytes))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(skewBlocks))
types.RollappPendingSubmissionsSkewTimeMinutes.Set(float64(skewTime.Minutes()))
}
Loading
Loading