diff --git a/block/manager.go b/block/manager.go index 3c3c5a1ea..018705c5d 100644 --- a/block/manager.go +++ b/block/manager.go @@ -180,6 +180,7 @@ func (m *Manager) Start(ctx context.Context, isAggregator bool) error { m.logger.Info("Starting in aggregator mode") // TODO(omritoptix): change to private methods go m.ProduceBlockLoop(ctx) + go m.SubmitLoop(ctx) } // TODO(omritoptix): change to private methods go m.RetriveLoop(ctx) @@ -282,6 +283,36 @@ func (m *Manager) waitForSync(ctx context.Context) error { return nil } +func (m *Manager) SubmitLoop(ctx context.Context) { + ticker := time.NewTicker(m.conf.BatchSubmitMaxTime) + defer ticker.Stop() + + for { + select { + //Context canceled + case <-ctx.Done(): + return + case <-ticker.C: + // SyncTarget is the height of the last block in the last batch as seen by this node. + syncTarget := atomic.LoadUint64(&m.syncTarget) + height := m.store.Height() + //no new blocks produced yet + if (height - syncTarget) == 0 { + continue + } + + // Submit batch if we've reached the batch size and there isn't another batch currently in submission process. + if m.batchInProcess.Load() == false { + m.batchInProcess.Store(true) + go m.submitNextBatch(ctx) + } + + //TODO: add the case of batch size (should be signaled from the the block production) + // case <- requiredByNumOfBlocks + } + } +} + // ProduceBlockLoop is calling publishBlock in a loop as long as wer'e synced. func (m *Manager) ProduceBlockLoop(ctx context.Context) { atomic.StoreInt64(&m.lastSubmissionTime, time.Now().Unix()) @@ -708,21 +739,6 @@ func (m *Manager) produceBlock(ctx context.Context, allowEmpty bool) error { m.logger.Info("block created", "height", newHeight, "num_tx", len(block.Data.Txs)) rollappHeightGauge.Set(float64(newHeight)) - - //TODO: move to separate function - lastSubmissionTime := atomic.LoadInt64(&m.lastSubmissionTime) - requiredByTime := time.Since(time.Unix(0, lastSubmissionTime)) > m.conf.BatchSubmitMaxTime - - // SyncTarget is the height of the last block in the last batch as seen by this node. - syncTarget := atomic.LoadUint64(&m.syncTarget) - requiredByNumOfBlocks := (block.Header.Height - syncTarget) > m.conf.BlockBatchSize - - // Submit batch if we've reached the batch size and there isn't another batch currently in submission process. - if m.batchInProcess.Load() == false && (requiredByTime || requiredByNumOfBlocks) { - m.batchInProcess.Store(true) - go m.submitNextBatch(ctx) - } - return nil } diff --git a/block/production_test.go b/block/production_test.go index 1f11d474b..732729136 100644 --- a/block/production_test.go +++ b/block/production_test.go @@ -195,6 +195,7 @@ func TestBatchSubmissionAfterTimeout(t *testing.T) { mCtx, cancel := context.WithTimeout(context.Background(), runTime) defer cancel() go manager.ProduceBlockLoop(mCtx) + go manager.SubmitLoop(mCtx) <-mCtx.Done() require.True(manager.batchInProcess.Load() == true) diff --git a/config/config.go b/config/config.go index 83b1572f3..efed1f6b7 100644 --- a/config/config.go +++ b/config/config.go @@ -102,10 +102,6 @@ func (c BlockManagerConfig) Validate() error { return fmt.Errorf("empty_blocks_max_time must be greater than block_time") } - if c.EmptyBlocksMaxTime != 0 && c.BatchSubmitMaxTime < c.EmptyBlocksMaxTime { - return fmt.Errorf("batch_submit_max_time must be greater than empty_blocks_max_time") - } - if c.BatchSubmitMaxTime < c.BlockTime { return fmt.Errorf("batch_submit_max_time must be greater than block_time") }