diff --git a/block/submit.go b/block/submit.go index d2d6d6e03..9eb2d1021 100644 --- a/block/submit.go +++ b/block/submit.go @@ -32,6 +32,7 @@ func (m *Manager) SubmitLoop(ctx context.Context, bytesProduced, m.Conf.BatchSkew, m.GetUnsubmittedBlocks, + m.GetTimeSkew, m.Conf.BatchSubmitTime, m.Conf.BatchSubmitBytes, m.CreateAndSubmitBatchGetSizeBlocksCommits, @@ -43,8 +44,9 @@ func SubmitLoopInner( ctx context.Context, logger types.Logger, bytesProduced chan int, // a channel of block and commit bytes produced - maxBatchSkew uint64, // max number of blocks that submitter is allowed to have pending - unsubmittedBlocks func() uint64, + maxBatchSkew time.Duration, // max number of blocks that submitter is allowed to have pending + unsubmittedBlocksNum func() uint64, + unsubmittedBlocksTime func() (time.Duration, error), maxBatchTime time.Duration, // max time to allow between batches maxBatchBytes uint64, // max size of serialised batch in bytes createAndSubmitBatch func(maxSizeBytes uint64) (sizeBlocksCommits uint64, err error), @@ -60,7 +62,11 @@ func SubmitLoopInner( // 'trigger': this thread is responsible for waking up the submitter when a new block arrives, and back-pressures the block production loop // if it gets too far ahead. for { - if maxBatchSkew*maxBatchBytes < pendingBytes.Load() { + skewTime, err := unsubmittedBlocksTime() + if err != nil { + return err + } + if maxBatchSkew < skewTime { // too much stuff is pending submission // we block here until we get a progress nudge from the submitter thread select { @@ -79,14 +85,15 @@ func SubmitLoopInner( } types.RollappPendingSubmissionsSkewBytes.Set(float64(pendingBytes.Load())) - types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocks())) + types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocksNum())) + types.RollappPendingSubmissionsSkewTimeHours.Set(float64(skewTime.Hours())) + submitter.Nudge() } }) eg.Go(func() error { // 'submitter': this thread actually creates and submits batches, and will do it on a timer if he isn't nudged by block production - timeLastSubmission := time.Now() ticker := time.NewTicker(maxBatchTime / 10) // interval does not need to match max batch time since we keep track anyway, it's just to wakeup for { select { @@ -96,15 +103,23 @@ func SubmitLoopInner( case <-submitter.C: } pending := pendingBytes.Load() + skewTime, err := unsubmittedBlocksTime() + if err != nil { + return err + } types.RollappPendingSubmissionsSkewBytes.Set(float64(pendingBytes.Load())) - types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocks())) - types.RollappPendingSubmissionsSkewBatches.Set(float64(pendingBytes.Load() / maxBatchBytes)) + types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocksNum())) + types.RollappPendingSubmissionsSkewTimeHours.Set(float64(skewTime.Hours())) // while there are accumulated blocks, create and submit batches!! for { done := ctx.Err() != nil nothingToSubmit := pending == 0 - lastSubmissionIsRecent := time.Since(timeLastSubmission) < maxBatchTime + skewTime, err := unsubmittedBlocksTime() + if err != nil { + return err + } + lastSubmissionIsRecent := skewTime < maxBatchTime maxDataNotExceeded := pending <= maxBatchBytes if done || nothingToSubmit || (lastSubmissionIsRecent && maxDataNotExceeded) { break @@ -125,7 +140,6 @@ func SubmitLoopInner( } return err } - timeLastSubmission = time.Now() ticker.Reset(maxBatchTime) pending = uatomic.Uint64Sub(&pendingBytes, nConsumed) logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending) // TODO: debug level @@ -280,6 +294,19 @@ func (m *Manager) GetUnsubmittedBlocks() uint64 { return m.State.Height() - m.LastSettlementHeight.Load() } +func (m *Manager) GetTimeSkew() (time.Duration, error) { + currentBlock, err := m.Store.LoadBlock(m.State.Height()) + if err != nil { + return time.Duration(0), err + } + lastSubmittedBlock, err := m.Store.LoadBlock(m.LastSubmittedHeight.Load()) + if err != nil { + return time.Duration(0), err + } + return currentBlock.Header.GetTimestamp().Sub(lastSubmittedBlock.Header.GetTimestamp()), nil + +} + // UpdateLastSubmittedHeight will update last height submitted height upon events. // This may be necessary in case we crashed/restarted before getting response for our submission to the settlement layer. func (m *Manager) UpdateLastSubmittedHeight(event pubsub.Message) { diff --git a/block/submit_loop_test.go b/block/submit_loop_test.go index 403adda52..c2f526300 100644 --- a/block/submit_loop_test.go +++ b/block/submit_loop_test.go @@ -16,7 +16,7 @@ import ( type testArgs struct { nParallel int // number of instances to run in parallel testDuration time.Duration // how long to run one instance of the test (should be short) - batchSkew uint64 // max number of batches to get ahead + batchSkew time.Duration // time between last block produced and submitted batchBytes uint64 // max number of bytes in a batch maxTime time.Duration // maximum time to wait before submitting submissions submitTime time.Duration // how long it takes to submit a batch @@ -71,9 +71,9 @@ func testSubmitLoopInner( default: } // producer shall not get too far ahead - absoluteMax := (args.batchSkew + 1) * args.batchBytes // +1 is because the producer is always blocked after the fact - nProduced := nProducedBytes.Load() - require.True(t, nProduced < absoluteMax, "produced bytes not less than maximum", "nProduced", nProduced, "max", absoluteMax) + //absoluteMax := (args.batchSkew + 1) * args.batchBytes // +1 is because the producer is always blocked after the fact + //nProduced := nProducedBytes.Load() + //require.True(t, nProduced < absoluteMax, "produced bytes not less than maximum", "nProduced", nProduced, "max", absoluteMax) } }() for { @@ -114,7 +114,11 @@ func testSubmitLoopInner( return pendingBlocks.Load() } - block.SubmitLoopInner(ctx, log.NewNopLogger(), producedBytesC, args.batchSkew, accumulatedBlocks, args.maxTime, args.batchBytes, submitBatch) + skewTime := func() (time.Duration, error) { + return 1 * time.Hour, nil + } + + block.SubmitLoopInner(ctx, log.NewNopLogger(), producedBytesC, args.batchSkew, accumulatedBlocks, skewTime, args.maxTime, args.batchBytes, submitBatch) } // Make sure the producer does not get too far ahead diff --git a/config/config.go b/config/config.go index bca5dace6..f6f6652a9 100644 --- a/config/config.go +++ b/config/config.go @@ -56,7 +56,7 @@ type BlockManagerConfig struct { // BatchSubmitMaxTime is how long should block manager wait for before submitting batch BatchSubmitTime time.Duration `mapstructure:"batch_submit_time"` // BatchSkew is the number of batches waiting to be submitted. Block production will be paused if this limit is reached. - BatchSkew uint64 `mapstructure:"max_batch_skew"` + BatchSkew time.Duration `mapstructure:"max_batch_skew"` // The size of the batch of blocks and commits in Bytes. We'll write every batch to the DA and the settlement layer. BatchSubmitBytes uint64 `mapstructure:"batch_submit_bytes"` } @@ -159,8 +159,8 @@ func (c BlockManagerConfig) Validate() error { return fmt.Errorf("batch_submit_bytes must be positive") } - if c.BatchSkew <= 0 { - return fmt.Errorf("max_batch_skew must be positive") + if c.BatchSkew < c.BatchSubmitTime { + return fmt.Errorf("max_batch_skew cannot be less than batch_submit_time %s", c.BatchSubmitTime) } return nil diff --git a/types/metrics.go b/types/metrics.go index 5ad097313..819026586 100644 --- a/types/metrics.go +++ b/types/metrics.go @@ -35,6 +35,11 @@ var RollappPendingSubmissionsSkewBatches = promauto.NewGauge(prometheus.GaugeOpt Help: "The number of batches which have been accumulated but not yet submitted.", }) +var RollappPendingSubmissionsSkewTimeHours = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "rollapp_pending_submissions_skew_time", + Help: "Time between the last block produced and the last block submitted in hours.", +}) + var RollappPendingSubmissionsSkewBytes = promauto.NewGauge(prometheus.GaugeOpts{ Name: "rollapp_pending_submissions_skew_bytes", Help: "The number of bytes (of blocks and commits) which have been accumulated but not yet submitted.",