Skip to content

Commit

Permalink
skew based on time
Browse files Browse the repository at this point in the history
  • Loading branch information
srene committed Nov 8, 2024
1 parent 2299320 commit 2f66f5e
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 17 deletions.
45 changes: 36 additions & 9 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (m *Manager) SubmitLoop(ctx context.Context,
bytesProduced,
m.Conf.BatchSkew,
m.GetUnsubmittedBlocks,
m.GetTimeSkew,
m.Conf.BatchSubmitTime,
m.Conf.BatchSubmitBytes,
m.CreateAndSubmitBatchGetSizeBlocksCommits,
Expand All @@ -43,8 +44,9 @@ func SubmitLoopInner(
ctx context.Context,
logger types.Logger,
bytesProduced chan int, // a channel of block and commit bytes produced
maxBatchSkew uint64, // max number of blocks that submitter is allowed to have pending
unsubmittedBlocks func() uint64,
maxBatchSkew time.Duration, // max number of blocks that submitter is allowed to have pending
unsubmittedBlocksNum func() uint64,
unsubmittedBlocksTime func() (time.Duration, error),
maxBatchTime time.Duration, // max time to allow between batches
maxBatchBytes uint64, // max size of serialised batch in bytes
createAndSubmitBatch func(maxSizeBytes uint64) (sizeBlocksCommits uint64, err error),
Expand All @@ -60,7 +62,11 @@ func SubmitLoopInner(
// 'trigger': this thread is responsible for waking up the submitter when a new block arrives, and back-pressures the block production loop
// if it gets too far ahead.
for {
if maxBatchSkew*maxBatchBytes < pendingBytes.Load() {
skewTime, err := unsubmittedBlocksTime()
if err != nil {
return err
}
if maxBatchSkew < skewTime {
// too much stuff is pending submission
// we block here until we get a progress nudge from the submitter thread
select {
Expand All @@ -79,14 +85,15 @@ func SubmitLoopInner(
}

types.RollappPendingSubmissionsSkewBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocks()))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocksNum()))
types.RollappPendingSubmissionsSkewTimeHours.Set(float64(skewTime.Hours()))

submitter.Nudge()
}
})

eg.Go(func() error {
// 'submitter': this thread actually creates and submits batches, and will do it on a timer if he isn't nudged by block production
timeLastSubmission := time.Now()
ticker := time.NewTicker(maxBatchTime / 10) // interval does not need to match max batch time since we keep track anyway, it's just to wakeup
for {
select {
Expand All @@ -96,15 +103,23 @@ func SubmitLoopInner(
case <-submitter.C:
}
pending := pendingBytes.Load()
skewTime, err := unsubmittedBlocksTime()
if err != nil {
return err
}
types.RollappPendingSubmissionsSkewBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocks()))
types.RollappPendingSubmissionsSkewBatches.Set(float64(pendingBytes.Load() / maxBatchBytes))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocksNum()))
types.RollappPendingSubmissionsSkewTimeHours.Set(float64(skewTime.Hours()))

// while there are accumulated blocks, create and submit batches!!
for {
done := ctx.Err() != nil
nothingToSubmit := pending == 0
lastSubmissionIsRecent := time.Since(timeLastSubmission) < maxBatchTime
skewTime, err := unsubmittedBlocksTime()
if err != nil {
return err
}
lastSubmissionIsRecent := skewTime < maxBatchTime
maxDataNotExceeded := pending <= maxBatchBytes
if done || nothingToSubmit || (lastSubmissionIsRecent && maxDataNotExceeded) {
break
Expand All @@ -125,7 +140,6 @@ func SubmitLoopInner(
}
return err
}
timeLastSubmission = time.Now()
ticker.Reset(maxBatchTime)
pending = uatomic.Uint64Sub(&pendingBytes, nConsumed)
logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending) // TODO: debug level
Expand Down Expand Up @@ -280,6 +294,19 @@ func (m *Manager) GetUnsubmittedBlocks() uint64 {
return m.State.Height() - m.LastSettlementHeight.Load()
}

func (m *Manager) GetTimeSkew() (time.Duration, error) {
currentBlock, err := m.Store.LoadBlock(m.State.Height())
if err != nil {
return time.Duration(0), err
}
lastSubmittedBlock, err := m.Store.LoadBlock(m.LastSubmittedHeight.Load())
if err != nil {
return time.Duration(0), err
}
return currentBlock.Header.GetTimestamp().Sub(lastSubmittedBlock.Header.GetTimestamp()), nil

}

// UpdateLastSubmittedHeight will update last height submitted height upon events.
// This may be necessary in case we crashed/restarted before getting response for our submission to the settlement layer.
func (m *Manager) UpdateLastSubmittedHeight(event pubsub.Message) {
Expand Down
14 changes: 9 additions & 5 deletions block/submit_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
type testArgs struct {
nParallel int // number of instances to run in parallel
testDuration time.Duration // how long to run one instance of the test (should be short)
batchSkew uint64 // max number of batches to get ahead
batchSkew time.Duration // time between last block produced and submitted
batchBytes uint64 // max number of bytes in a batch
maxTime time.Duration // maximum time to wait before submitting submissions
submitTime time.Duration // how long it takes to submit a batch
Expand Down Expand Up @@ -71,9 +71,9 @@ func testSubmitLoopInner(
default:
}
// producer shall not get too far ahead
absoluteMax := (args.batchSkew + 1) * args.batchBytes // +1 is because the producer is always blocked after the fact
nProduced := nProducedBytes.Load()
require.True(t, nProduced < absoluteMax, "produced bytes not less than maximum", "nProduced", nProduced, "max", absoluteMax)
//absoluteMax := (args.batchSkew + 1) * args.batchBytes // +1 is because the producer is always blocked after the fact
//nProduced := nProducedBytes.Load()
//require.True(t, nProduced < absoluteMax, "produced bytes not less than maximum", "nProduced", nProduced, "max", absoluteMax)
}
}()
for {
Expand Down Expand Up @@ -114,7 +114,11 @@ func testSubmitLoopInner(
return pendingBlocks.Load()
}

block.SubmitLoopInner(ctx, log.NewNopLogger(), producedBytesC, args.batchSkew, accumulatedBlocks, args.maxTime, args.batchBytes, submitBatch)
skewTime := func() (time.Duration, error) {
return 1 * time.Hour, nil
}

block.SubmitLoopInner(ctx, log.NewNopLogger(), producedBytesC, args.batchSkew, accumulatedBlocks, skewTime, args.maxTime, args.batchBytes, submitBatch)
}

// Make sure the producer does not get too far ahead
Expand Down
6 changes: 3 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type BlockManagerConfig struct {
// BatchSubmitMaxTime is how long should block manager wait for before submitting batch
BatchSubmitTime time.Duration `mapstructure:"batch_submit_time"`
// BatchSkew is the number of batches waiting to be submitted. Block production will be paused if this limit is reached.
BatchSkew uint64 `mapstructure:"max_batch_skew"`
BatchSkew time.Duration `mapstructure:"max_batch_skew"`
// The size of the batch of blocks and commits in Bytes. We'll write every batch to the DA and the settlement layer.
BatchSubmitBytes uint64 `mapstructure:"batch_submit_bytes"`
}
Expand Down Expand Up @@ -159,8 +159,8 @@ func (c BlockManagerConfig) Validate() error {
return fmt.Errorf("batch_submit_bytes must be positive")
}

if c.BatchSkew <= 0 {
return fmt.Errorf("max_batch_skew must be positive")
if c.BatchSkew < c.BatchSubmitTime {
return fmt.Errorf("max_batch_skew cannot be less than batch_submit_time %s", c.BatchSubmitTime)
}

return nil
Expand Down
5 changes: 5 additions & 0 deletions types/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ var RollappPendingSubmissionsSkewBatches = promauto.NewGauge(prometheus.GaugeOpt
Help: "The number of batches which have been accumulated but not yet submitted.",
})

var RollappPendingSubmissionsSkewTimeHours = promauto.NewGauge(prometheus.GaugeOpts{
Name: "rollapp_pending_submissions_skew_time",
Help: "Time between the last block produced and the last block submitted in hours.",
})

var RollappPendingSubmissionsSkewBytes = promauto.NewGauge(prometheus.GaugeOpts{
Name: "rollapp_pending_submissions_skew_bytes",
Help: "The number of bytes (of blocks and commits) which have been accumulated but not yet submitted.",
Expand Down

0 comments on commit 2f66f5e

Please sign in to comment.