From 242acb7f4b167480a11e444e509e34dd5fc22e74 Mon Sep 17 00:00:00 2001 From: Daniel T <30197399+danwt@users.noreply.github.com> Date: Mon, 22 Jul 2024 17:54:00 +0100 Subject: [PATCH] fix(submission): fix counting and time (#969) Co-authored-by: zale144 --- block/manager.go | 37 ++-- block/manager_test.go | 30 +-- block/produce.go | 54 +++-- block/production_test.go | 29 ++- block/state.go | 2 +- block/submit.go | 294 +++++++++++++------------ block/submit_loop_test.go | 155 +++++++++++++ block/submit_test.go | 36 +-- config/config.go | 14 +- config/config_test.go | 16 +- config/defaults.go | 12 +- config/flags.go | 2 +- config/toml.go | 4 +- da/avail/avail_test.go | 102 +-------- da/celestia/celestia_test.go | 28 +-- da/celestia/rpc_test.go | 14 +- da/da_test.go | 12 +- da/local/local.go | 6 +- node/node_test.go | 8 +- rpc/client/client_test.go | 40 ++-- rpc/json/service_test.go | 10 +- settlement/dymension/dymension.go | 26 +-- settlement/dymension/dymension_test.go | 2 +- settlement/grpc/grpc.go | 6 +- settlement/local/local.go | 4 +- settlement/local/local_test.go | 18 +- testutil/block.go | 9 +- testutil/node.go | 8 +- testutil/types.go | 32 ++- types/batch.go | 45 +++- types/block.go | 8 + types/errors.go | 2 +- types/metrics.go | 10 + types/serialization.go | 17 +- utils/channel/funcs.go | 31 +++ utils/event/funcs.go | 8 +- 36 files changed, 657 insertions(+), 474 deletions(-) create mode 100644 block/submit_loop_test.go create mode 100644 utils/channel/funcs.go diff --git a/block/manager.go b/block/manager.go index a1e4314cb..78d5ce885 100644 --- a/block/manager.go +++ b/block/manager.go @@ -50,11 +50,6 @@ type Manager struct { DAClient da.DataAvailabilityLayerClient SLClient settlement.ClientI - /* - Production - */ - producedSizeC chan uint64 // for the producer to report the size of the block+commit it produced - /* Submission */ @@ -114,7 +109,6 @@ func NewManager( SLClient: settlementClient, Retriever: dalc.(da.BatchRetriever), targetSyncHeight: diodes.NewOneToOne(1, nil), - producedSizeC: make(chan uint64), logger: logger, blockCache: make(map[uint64]CachedBlock), } @@ -163,11 +157,16 @@ func (m *Manager) Start(ctx context.Context) error { if isSequencer { // Sequencer must wait till DA is synced to start submitting blobs <-m.DAClient.Synced() + nBytes := m.GetUnsubmittedBytes() + bytesProducedC := make(chan int) + go func() { + bytesProducedC <- nBytes + }() eg.Go(func() error { - return m.SubmitLoop(ctx) + return m.SubmitLoop(ctx, bytesProducedC) }) eg.Go(func() error { - return m.ProduceBlockLoop(ctx) + return m.ProduceBlockLoop(ctx, bytesProducedC) }) } else { eg.Go(func() error { @@ -177,6 +176,12 @@ func (m *Manager) Start(ctx context.Context) error { return m.SyncToTargetHeightLoop(ctx) }) } + + go func() { + err := eg.Wait() + m.logger.Info("Block manager err group finished.", "err", err) + }() + return nil } @@ -220,19 +225,3 @@ func (m *Manager) syncBlockManager() error { m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSubmittedHeight.Load()) return nil } - -func (m *Manager) MustLoadBlock(h uint64) *types.Block { - ret, err := m.Store.LoadBlock(h) - if err != nil { - panic(fmt.Errorf("store load block: height: %d: %w", h, err)) - } - return ret -} - -func (m *Manager) MustLoadCommit(h uint64) *types.Commit { - ret, err := m.Store.LoadCommit(h) - if err != nil { - panic(fmt.Errorf("store load commit: height: %d: %w", h, err)) - } - return ret -} diff --git a/block/manager_test.go b/block/manager_test.go index 6c88fee31..dcae5d160 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -128,7 +128,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) { assert.Equal(t, daResultSubmitBatch.Code, da.StatusSuccess) err = manager.SLClient.SubmitBatch(batch, manager.DAClient.GetClientType(), &daResultSubmitBatch) require.NoError(t, err) - nextBatchStartHeight = batch.EndHeight + 1 + nextBatchStartHeight = batch.EndHeight() + 1 // Wait until daHeight is updated time.Sleep(time.Millisecond * 500) } @@ -148,9 +148,9 @@ func TestProduceOnlyAfterSynced(t *testing.T) { assert.NoError(t, err) }() <-ctx.Done() - assert.Equal(t, batch.EndHeight, manager.LastSubmittedHeight.Load()) + assert.Equal(t, batch.EndHeight(), manager.LastSubmittedHeight.Load()) // validate that we produced blocks - assert.Greater(t, manager.State.Height(), batch.EndHeight) + assert.Greater(t, manager.State.Height(), batch.EndHeight()) } func TestRetrieveDaBatchesFailed(t *testing.T) { @@ -319,7 +319,7 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) { require.NoError(err) // Init manager managerConfig := testutil.GetManagerConfig() - managerConfig.BlockBatchMaxSizeBytes = batchLimitBytes // enough for 2 block, not enough for 10 blocks + managerConfig.BatchMaxSizeBytes = batchLimitBytes // enough for 2 block, not enough for 10 blocks manager, err := testutil.GetManager(managerConfig, nil, nil, 1, 1, 0, proxyApp, nil) require.NoError(err) @@ -354,23 +354,23 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) { // Call createNextDABatch function startHeight := manager.NextHeightToSubmit() endHeight := startHeight + uint64(tc.blocksToProduce) - 1 - batch, err := manager.CreateNextBatchToSubmit(startHeight, endHeight) + batch, err := manager.CreateBatch(manager.Conf.BatchMaxSizeBytes, startHeight, endHeight) assert.NoError(err) - assert.Equal(batch.StartHeight, startHeight) - assert.LessOrEqual(batch.ToProto().Size(), int(managerConfig.BlockBatchMaxSizeBytes)) + assert.Equal(batch.StartHeight(), startHeight) + assert.LessOrEqual(batch.SizeBytes(), int(managerConfig.BatchMaxSizeBytes)) if !tc.expectedToBeTruncated { - assert.Equal(batch.EndHeight, endHeight) + assert.Equal(batch.EndHeight(), endHeight) } else { - assert.Equal(batch.EndHeight, batch.StartHeight+uint64(len(batch.Blocks))-1) - assert.Less(batch.EndHeight, endHeight) + assert.Equal(batch.EndHeight(), batch.StartHeight()+batch.NumBlocks()-1) + assert.Less(batch.EndHeight(), endHeight) // validate next added block to batch would have been actually too big - // First relax the byte limit so we could proudce larger batch - manager.Conf.BlockBatchMaxSizeBytes = 10 * manager.Conf.BlockBatchMaxSizeBytes - newBatch, err := manager.CreateNextBatchToSubmit(startHeight, batch.EndHeight+1) - assert.Greater(newBatch.ToProto().Size(), batchLimitBytes) + // First relax the byte limit so we could produce larger batch + manager.Conf.BatchMaxSizeBytes = 10 * manager.Conf.BatchMaxSizeBytes + newBatch, err := manager.CreateBatch(manager.Conf.BatchMaxSizeBytes, startHeight, batch.EndHeight()+1) + assert.Greater(newBatch.SizeBytes(), batchLimitBytes) assert.NoError(err) } @@ -431,7 +431,7 @@ func TestDAFetch(t *testing.T) { t.Run(c.name, func(t *testing.T) { app.On("Commit", mock.Anything).Return(abci.ResponseCommit{Data: commitHash[:]}).Once() app.On("Info", mock.Anything).Return(abci.ResponseInfo{ - LastBlockHeight: int64(batch.EndHeight), + LastBlockHeight: int64(batch.EndHeight()), LastBlockAppHash: commitHash[:], }) err := manager.ProcessNextDABatch(c.daMetaData) diff --git a/block/produce.go b/block/produce.go index 6c8c78aa5..b83ba0584 100644 --- a/block/produce.go +++ b/block/produce.go @@ -21,7 +21,9 @@ import ( ) // ProduceBlockLoop is calling publishBlock in a loop as long as we're synced. -func (m *Manager) ProduceBlockLoop(ctx context.Context) (err error) { +// A signal will be sent to the bytesProduced channel for each block produced +// In this way it's possible to pause block production by not consuming the channel +func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int) error { m.logger.Info("Started block producer loop.") ticker := time.NewTicker(m.Conf.BlockTime) @@ -36,52 +38,58 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context) (err error) { for { select { case <-ctx.Done(): - return + return nil case <-ticker.C: // if empty blocks are configured to be enabled, and one is scheduled... produceEmptyBlock := firstBlock || 0 == m.Conf.MaxIdleTime || nextEmptyBlock.Before(time.Now()) firstBlock = false - var ( - block *types.Block - commit *types.Commit - ) - block, commit, err = m.ProduceAndGossipBlock(ctx, produceEmptyBlock) + block, commit, err := m.ProduceAndGossipBlock(ctx, produceEmptyBlock) if errors.Is(err, context.Canceled) { m.logger.Error("Produce and gossip: context canceled.", "error", err) - return + return nil } - if errors.Is(err, types.ErrSkippedEmptyBlock) { + if errors.Is(err, types.ErrEmptyBlock) { // occurs if the block was empty but we don't want to produce one continue } if errors.Is(err, ErrNonRecoverable) { - m.logger.Error("Produce and gossip: non-recoverable.", "error", err) // TODO: flush? or don't log at all? uevent.MustPublish(ctx, m.Pubsub, &events.DataHealthStatus{Error: err}, events.HealthStatusList) - return + return err } if err != nil { m.logger.Error("Produce and gossip: uncategorized, assuming recoverable.", "error", err) continue } - // If IBC transactions are present, set proof required to true - // This will set a shorter timer for the next block - // currently we set it for all txs as we don't have a way to determine if an IBC tx is present (https://github.com/dymensionxyz/dymint/issues/709) nextEmptyBlock = time.Now().Add(m.Conf.MaxIdleTime) if 0 < len(block.Data.Txs) { + // the block wasn't empty so we want to make sure we don't wait too long before producing another one, in order to facilitate proofs for ibc + // TODO: optimize to only do this if IBC transactions are present (https://github.com/dymensionxyz/dymint/issues/709) nextEmptyBlock = time.Now().Add(m.Conf.MaxProofTime) } else { m.logger.Info("Produced empty block.") } - // Send the size to the accumulated size channel - // This will block in case the submitter is too slow and it's buffer is full - size := uint64(block.ToProto().Size()) + uint64(commit.ToProto().Size()) + bytesProducedN := block.SizeBytes() + commit.SizeBytes() select { case <-ctx.Done(): - return - case m.producedSizeC <- size: + return nil + case bytesProducedC <- bytesProducedN: + default: + evt := &events.DataHealthStatus{Error: fmt.Errorf("bytes produced channel is full: %w", gerrc.ErrResourceExhausted)} + uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList) + m.logger.Error("Enough bytes to build a batch have been accumulated, but too many batches are pending submission." + + "Pausing block production until a signal is consumed.") + select { + case <-ctx.Done(): + return nil + case bytesProducedC <- bytesProducedN: + evt := &events.DataHealthStatus{Error: nil} + uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList) + m.logger.Info("Resumed block production.") + } } + } } } @@ -139,10 +147,10 @@ func (m *Manager) produceBlock(allowEmpty bool) (*types.Block, *types.Commit, er return nil, nil, fmt.Errorf("load block: height: %d: %w: %w", newHeight, err, ErrNonRecoverable) } else { // limit to the max block data, so we don't create a block that is too big to fit in a batch - maxBlockDataSize := uint64(float64(m.Conf.BlockBatchMaxSizeBytes) * types.MaxBlockSizeAdjustment) + maxBlockDataSize := uint64(float64(m.Conf.BatchMaxSizeBytes) * types.MaxBlockSizeAdjustment) block = m.Executor.CreateBlock(newHeight, lastCommit, lastHeaderHash, m.State, maxBlockDataSize) if !allowEmpty && len(block.Data.Txs) == 0 { - return nil, nil, fmt.Errorf("%w: %w", types.ErrSkippedEmptyBlock, ErrRecoverable) + return nil, nil, fmt.Errorf("%w: %w", types.ErrEmptyBlock, ErrRecoverable) } abciHeaderPb := types.ToABCIHeaderPB(&block.Header) @@ -201,8 +209,8 @@ func (m *Manager) createTMSignature(block *types.Block, proposerAddress []byte, v := vote.ToProto() // convert libp2p key to tm key // TODO: move to types - raw_key, _ := m.LocalKey.Raw() - tmprivkey := tmed25519.PrivKey(raw_key) + rawKey, _ := m.LocalKey.Raw() + tmprivkey := tmed25519.PrivKey(rawKey) tmprivkey.PubKey().Bytes() // Create a mock validator to sign the vote tmvalidator := tmtypes.NewMockPVWithParams(tmprivkey, false, false) diff --git a/block/production_test.go b/block/production_test.go index 3615d72bb..a1589f409 100644 --- a/block/production_test.go +++ b/block/production_test.go @@ -13,6 +13,7 @@ import ( "github.com/dymensionxyz/dymint/mempool" mempoolv1 "github.com/dymensionxyz/dymint/mempool/v1" "github.com/dymensionxyz/dymint/node/events" + uchannel "github.com/dymensionxyz/dymint/utils/channel" uevent "github.com/dymensionxyz/dymint/utils/event" tmcfg "github.com/tendermint/tendermint/config" @@ -57,13 +58,11 @@ func TestCreateEmptyBlocksEnableDisable(t *testing.T) { mCtx, cancel := context.WithTimeout(context.Background(), runTime) defer cancel() - go manager.ProduceBlockLoop(mCtx) - go managerWithEmptyBlocks.ProduceBlockLoop(mCtx) - - buf1 := make(chan struct{}, 100) // dummy to avoid unhealthy event - buf2 := make(chan struct{}, 100) // dummy to avoid unhealthy event - go manager.AccumulatedDataLoop(mCtx, buf1) - go managerWithEmptyBlocks.AccumulatedDataLoop(mCtx, buf2) + bytesProduced1 := make(chan int) + bytesProduced2 := make(chan int) + go manager.ProduceBlockLoop(mCtx, bytesProduced1) + go managerWithEmptyBlocks.ProduceBlockLoop(mCtx, bytesProduced2) + uchannel.DrainForever(bytesProduced1, bytesProduced2) <-mCtx.Done() require.Greater(manager.State.Height(), initialHeight) @@ -143,7 +142,9 @@ func TestCreateEmptyBlocksNew(t *testing.T) { mCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - go manager.ProduceBlockLoop(mCtx) + bytesProduced := make(chan int) + go manager.ProduceBlockLoop(mCtx, bytesProduced) + uchannel.DrainForever(bytesProduced) <-time.Tick(1 * time.Second) err = mpool.CheckTx([]byte{1, 2, 3, 4}, nil, mempool.TxInfo{}) @@ -182,7 +183,7 @@ func TestStopBlockProduction(t *testing.T) { require := require.New(t) managerConfig := testutil.GetManagerConfig() - managerConfig.BlockBatchMaxSizeBytes = 1000 // small batch size to fill up quickly + managerConfig.BatchMaxSizeBytes = 1000 // small batch size to fill up quickly manager, err := testutil.GetManager(managerConfig, nil, nil, 1, 1, 0, nil, nil) require.NoError(err) @@ -201,14 +202,10 @@ func TestStopBlockProduction(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - go func() { - manager.ProduceBlockLoop(ctx) - wg.Done() // Decrease counter when this goroutine finishes - }() + bytesProducedC := make(chan int) - toSubmit := make(chan struct{}) go func() { - manager.AccumulatedDataLoop(ctx, toSubmit) + manager.ProduceBlockLoop(ctx, bytesProducedC) wg.Done() // Decrease counter when this goroutine finishes }() @@ -232,7 +229,7 @@ func TestStopBlockProduction(t *testing.T) { assert.Equal(stoppedHeight, manager.State.Height()) // consume the signal - <-toSubmit + <-bytesProducedC // check for health status event and block production to continue select { diff --git a/block/state.go b/block/state.go index 36c5ffc19..ea49dbacd 100644 --- a/block/state.go +++ b/block/state.go @@ -145,7 +145,7 @@ func (e *Executor) UpdateMempoolAfterInitChain(s *types.State) { e.mempool.SetPostCheckFn(mempool.PostCheckMaxGas(s.ConsensusParams.Block.MaxGas)) } -// Update state from Commit response +// UpdateStateAfterCommit using commit response func (e *Executor) UpdateStateAfterCommit(s *types.State, resp *tmstate.ABCIResponses, appHash []byte, height uint64, valSet *tmtypes.ValidatorSet) { copy(s.AppHash[:], appHash[:]) copy(s.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash()) diff --git a/block/submit.go b/block/submit.go index 14d551d35..f4e6cbe03 100644 --- a/block/submit.go +++ b/block/submit.go @@ -2,168 +2,152 @@ package block import ( "context" + "errors" "fmt" + "sync/atomic" "time" - "github.com/dymensionxyz/gerr-cosmos/gerrc" - "github.com/dymensionxyz/dymint/da" - "github.com/dymensionxyz/dymint/node/events" "github.com/dymensionxyz/dymint/types" - uevent "github.com/dymensionxyz/dymint/utils/event" + uchannel "github.com/dymensionxyz/dymint/utils/channel" + "github.com/dymensionxyz/gerr-cosmos/gerrc" + "golang.org/x/sync/errgroup" ) // SubmitLoop is the main loop for submitting blocks to the DA and SL layers. // It submits a batch when either // 1) It accumulates enough block data, so it's necessary to submit a batch to avoid exceeding the max size // 2) Enough time passed since the last submitted batch, so it's necessary to submit a batch to avoid exceeding the max time -func (m *Manager) SubmitLoop(ctx context.Context) (err error) { - maxTime := time.NewTicker(m.Conf.BatchSubmitMaxTime) - defer maxTime.Stop() - - // get produced size from the block production loop and signal to submit the batch when batch size reached - maxSizeC := make(chan struct{}, m.Conf.MaxSupportedBatchSkew) - go m.AccumulatedDataLoop(ctx, maxSizeC) - - // defer func to clear the channels to release blocked goroutines on shutdown - defer func() { - m.logger.Info("Stopped submit loop.") +// It will back pressure (pause) block production if it falls too far behind. +func (m *Manager) SubmitLoop(ctx context.Context, + bytesProduced chan int, +) (err error) { + return SubmitLoopInner(ctx, + bytesProduced, + m.Conf.MaxBatchSkew, + m.Conf.BatchSubmitMaxTime, + m.Conf.BatchMaxSizeBytes, + m.CreateAndSubmitBatchGetSizeBlocksCommits, + ) +} +// SubmitLoopInner is a unit testable impl of SubmitLoop +func SubmitLoopInner(ctx context.Context, + bytesProduced chan int, // a channel of block and commit bytes produced + maxBatchSkew uint64, // max number of batches that submitter is allowed to have pending + maxBatchTime time.Duration, // max time to allow between batches + maxBatchBytes uint64, // max size of serialised batch in bytes + createAndSubmitBatch func(maxSizeBytes uint64) (sizeBlocksCommits uint64, err error), +) error { + eg, ctx := errgroup.WithContext(ctx) + + pendingBytes := atomic.Uint64{} + trigger := uchannel.NewNudger() // used to avoid busy waiting (using cpu) on trigger thread + submitter := uchannel.NewNudger() // used to avoid busy waiting (using cpu) on submitter thread + + eg.Go(func() error { + // 'trigger': we need one thread to continuously consume the bytes produced channel, and to monitor timer + ticker := time.NewTicker(maxBatchTime) + defer ticker.Stop() for { - select { - case <-m.producedSizeC: - case <-maxSizeC: - default: - return + if maxBatchSkew*maxBatchBytes < pendingBytes.Load() { + // too much stuff is pending submission + // we block here until we get a progress nudge from the submitter thread + select { + case <-ctx.Done(): + return ctx.Err() + case <-trigger.C: + case <-ticker.C: + // It's theoretically possible for the thread scheduler to pause this thread after entering this if statement + // for enough time for the submitter thread to submit all the pending bytes and do the nudge, and then for the + // thread scheduler to wake up this thread after the nudge has been missed, which would be a deadlock. + // Although this is only a theoretical possibility which should never happen in practice, it may be possible, e.g. + // in adverse CPU conditions or tests using compressed timeframes. To be sound, we also nudge with the ticker, which + // has no downside. + } + } else { + select { + case <-ctx.Done(): + return ctx.Err() + case n := <-bytesProduced: + pendingBytes.Add(uint64(n)) + case <-ticker.C: + } } - } - }() - - for { - select { - case <-ctx.Done(): - return - case <-maxSizeC: - case <-maxTime.C: - } - // modular submission methods have own retries mechanism. - // if error returned, we assume it's unrecoverable. - err = m.HandleSubmissionTrigger() - if err != nil { - m.logger.Error("Error submitting batch", "error", err) - uevent.MustPublish(ctx, m.Pubsub, &events.DataHealthStatus{Error: err}, events.HealthStatusList) - return + types.RollappPendingSubmissionsSkewNumBytes.Set(float64(pendingBytes.Load())) + types.RollappPendingSubmissionsSkewNumBatches.Set(float64(pendingBytes.Load() / maxBatchBytes)) + submitter.Nudge() } - maxTime.Reset(m.Conf.BatchSubmitMaxTime) - } -} - -// AccumulatedDataLoop is the main loop for accumulating the produced data size. -// It is triggered by the ProducedSizeCh channel, which is populated by the block production loop when a new block is produced. -// It accumulates the size of the produced data and triggers the submission of the batch when the accumulated size is greater than the max size. -// It also emits a health status event when the submission channel is full. -func (m *Manager) AccumulatedDataLoop(ctx context.Context, toSubmit chan struct{}) { - total := uint64(0) - - /* - On node start we want to include the count of any blocks which were produced and not submitted in a previous instance - */ - currH := m.State.Height() - for h := m.LastSubmittedHeight.Load() + 1; h <= currH; h++ { - block := m.MustLoadBlock(h) - commit := m.MustLoadCommit(h) - total += uint64(block.ToProto().Size()) + uint64(commit.ToProto().Size()) - } - - for { - for m.Conf.BlockBatchMaxSizeBytes <= total { // TODO: allow some tolerance for block size (e.g support for BlockBatchMaxSize +- 10%) - total -= m.Conf.BlockBatchMaxSizeBytes + }) + eg.Go(func() error { + // 'submitter': this thread actually creates and submits batches + timeLastSubmission := time.Now() + for { select { case <-ctx.Done(): - return - case toSubmit <- struct{}{}: - m.logger.Info("Enough bytes to build a batch have been accumulated. Sent signal to submit the batch.") - default: - m.logger.Error("Enough bytes to build a batch have been accumulated, but too many batches are pending submission. " + - "Pausing block production until a batch submission signal is consumed.") - - evt := &events.DataHealthStatus{Error: fmt.Errorf("submission channel is full: %w", gerrc.ErrResourceExhausted)} - uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList) - - /* - Now we block until earlier batches have been submitted. This has the effect of not consuming the producedSizeC, - which will stop new block production. - */ - select { - case <-ctx.Done(): - return - case toSubmit <- struct{}{}: + return ctx.Err() + case <-submitter.C: + } + pending := pendingBytes.Load() + // while there are accumulated blocks, create and submit batches!! + for { + done := ctx.Err() != nil + nothingToSubmit := pending == 0 + lastSubmissionIsRecent := time.Since(timeLastSubmission) < maxBatchTime + maxDataNotExceeded := pending <= maxBatchBytes + if done || nothingToSubmit || (lastSubmissionIsRecent && maxDataNotExceeded) { + break } - - evt = &events.DataHealthStatus{Error: nil} - uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList) - - m.logger.Info("Resumed block production.") + nConsumed, err := createAndSubmitBatch(min(pending, maxBatchBytes)) + if err != nil { + return fmt.Errorf("create and submit batch: %w", err) + } + timeLastSubmission = time.Now() + pending = pendingBytes.Add(^(nConsumed - 1)) // subtract } + trigger.Nudge() } - select { - case <-ctx.Done(): - return - case size := <-m.producedSizeC: - total += size - } - } -} + }) -// HandleSubmissionTrigger processes the sublayer submission trigger event. It checks if there are new blocks produced since the last submission. -// If there are, it attempts to submit a batch of blocks. It then attempts to produce an empty block to ensure IBC messages -// pass through during the batch submission process due to proofs requires for ibc messages only exist on the next block. -// Finally, it submits the next batch of blocks and updates the sync target to the height of the last block in the submitted batch. -func (m *Manager) HandleSubmissionTrigger() error { - // Load current sync target and height to determine if new blocks are available for submission. - - startHeight := m.NextHeightToSubmit() - endHeightInclusive := m.State.Height() + return eg.Wait() +} - if endHeightInclusive < startHeight { - return nil // No new blocks have been produced +// CreateAndSubmitBatchGetSizeBlocksCommits creates and submits a batch to the DA and SL. +// Returns size of block and commit bytes +// max size bytes is the maximum size of the serialized batch type +func (m *Manager) CreateAndSubmitBatchGetSizeBlocksCommits(maxSize uint64) (uint64, error) { + b, err := m.CreateAndSubmitBatch(maxSize) + if b == nil { + return 0, err } + return uint64(b.SizeBlockAndCommitBytes()), err +} - nextBatch, err := m.CreateNextBatchToSubmit(startHeight, endHeightInclusive) +// CreateAndSubmitBatch creates and submits a batch to the DA and SL. +// max size bytes is the maximum size of the serialized batch type +func (m *Manager) CreateAndSubmitBatch(maxSizeBytes uint64) (*types.Batch, error) { + b, err := m.CreateBatch(maxSizeBytes, m.NextHeightToSubmit(), m.State.Height()) if err != nil { - return fmt.Errorf("create next batch to submit: %w", err) + return nil, fmt.Errorf("create batch: %w", err) } - resultSubmitToDA := m.DAClient.SubmitBatch(nextBatch) - m.logger.Info("Submitted batch to DA", "start height", nextBatch.StartHeight, "end height", nextBatch.EndHeight) - if resultSubmitToDA.Code != da.StatusSuccess { - return fmt.Errorf("submit next batch to da: %s", resultSubmitToDA.Message) + if err := m.SubmitBatch(b); err != nil { + return nil, fmt.Errorf("submit batch: %w", err) } - - actualEndHeight := nextBatch.EndHeight - - err = m.SLClient.SubmitBatch(nextBatch, m.DAClient.GetClientType(), &resultSubmitToDA) - if err != nil { - return fmt.Errorf("sl client submit batch: start height: %d: inclusive end height: %d: %w", startHeight, actualEndHeight, err) - } - m.logger.Info("Submitted batch to SL.", "start height", nextBatch.StartHeight, "end height", actualEndHeight) - - types.RollappHubHeightGauge.Set(float64(actualEndHeight)) - m.LastSubmittedHeight.Store(actualEndHeight) - return nil + return b, nil } -func (m *Manager) CreateNextBatchToSubmit(startHeight uint64, endHeightInclusive uint64) (*types.Batch, error) { +// CreateBatch looks through the store for any unsubmitted blocks and commits and bundles them into a batch +// max size bytes is the maximum size of the serialized batch type +func (m *Manager) CreateBatch(maxBatchSize uint64, startHeight uint64, endHeightInclusive uint64) (*types.Batch, error) { batchSize := endHeightInclusive - startHeight + 1 batch := &types.Batch{ - StartHeight: startHeight, - Blocks: make([]*types.Block, 0, batchSize), - Commits: make([]*types.Commit, 0, batchSize), + Blocks: make([]*types.Block, 0, batchSize), + Commits: make([]*types.Commit, 0, batchSize), } - // Populate the batch for height := startHeight; height <= endHeightInclusive; height++ { block, err := m.Store.LoadBlock(height) if err != nil { @@ -177,25 +161,65 @@ func (m *Manager) CreateNextBatchToSubmit(startHeight uint64, endHeightInclusive batch.Blocks = append(batch.Blocks, block) batch.Commits = append(batch.Commits, commit) - // Check if the batch size is too big - totalSize := batch.ToProto().Size() - if totalSize > int(m.Conf.BlockBatchMaxSizeBytes) { - // Nil out the last block and commit - batch.Blocks[len(batch.Blocks)-1] = nil - batch.Commits[len(batch.Commits)-1] = nil + totalSize := batch.SizeBytes() + if int(maxBatchSize) < totalSize { // Remove the last block and commit from the batch batch.Blocks = batch.Blocks[:len(batch.Blocks)-1] batch.Commits = batch.Commits[:len(batch.Commits)-1] if height == startHeight { - return nil, fmt.Errorf("block size exceeds max batch size: height %d: size: %d", height, totalSize) + return nil, fmt.Errorf("block size exceeds max batch size: height %d: size: %d: %w", height, totalSize, gerrc.ErrOutOfRange) } break } - - batch.EndHeight = height } return batch, nil } + +func (m *Manager) SubmitBatch(batch *types.Batch) error { + resultSubmitToDA := m.DAClient.SubmitBatch(batch) + if resultSubmitToDA.Code != da.StatusSuccess { + return fmt.Errorf("da client submit batch: %s", resultSubmitToDA.Message) + } + m.logger.Info("Submitted batch to DA.", "start height", batch.StartHeight(), "end height", batch.EndHeight()) + + err := m.SLClient.SubmitBatch(batch, m.DAClient.GetClientType(), &resultSubmitToDA) + if err != nil { + return fmt.Errorf("sl client submit batch: start height: %d: end height: %d: %w", batch.StartHeight(), batch.EndHeight(), err) + } + m.logger.Info("Submitted batch to SL.", "start height", batch.StartHeight(), "end height", batch.EndHeight()) + + types.RollappHubHeightGauge.Set(float64(batch.EndHeight())) + m.LastSubmittedHeight.Store(batch.EndHeight()) + return nil +} + +// GetUnsubmittedBytes returns the total number of unsubmitted bytes produced an element on a channel +// Intended only to be used at startup, before block production and submission loops start +func (m *Manager) GetUnsubmittedBytes() int { + total := 0 + /* + On node start we want to include the count of any blocks which were produced and not submitted in a previous instance + */ + currH := m.State.Height() + for h := m.NextHeightToSubmit(); h <= currH; h++ { + block, err := m.Store.LoadBlock(h) + if err != nil { + if !errors.Is(err, gerrc.ErrNotFound) { + m.logger.Error("Get unsubmitted bytes load block.", "err", err) + } + break + } + commit, err := m.Store.LoadBlock(h) + if err != nil { + if !errors.Is(err, gerrc.ErrNotFound) { + m.logger.Error("Get unsubmitted bytes load commit.", "err", err) + } + break + } + total += block.SizeBytes() + commit.SizeBytes() + } + return total +} diff --git a/block/submit_loop_test.go b/block/submit_loop_test.go new file mode 100644 index 000000000..9faba65c1 --- /dev/null +++ b/block/submit_loop_test.go @@ -0,0 +1,155 @@ +package block_test + +import ( + "context" + "math/rand" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/dymensionxyz/dymint/block" + "github.com/stretchr/testify/require" +) + +type testArgs struct { + nParallel int // number of instances to run in parallel + testDuration time.Duration // how long to run one instance of the test (should be short) + batchSkew uint64 // max number of batches to get ahead + batchBytes uint64 // max number of bytes in a batch + maxTime time.Duration // maximum time to wait before submitting submissions + submitTime time.Duration // how long it takes to submit a batch + produceTime time.Duration // time between producing block + produceBytes int // range of how many bytes each block (+ commit) is + submissionHaltTime time.Duration // how long to simulate batch submission failing/halting + submissionHaltProbability float64 // probability of submission failing and causing a temporary halt +} + +func testSubmitLoop(t *testing.T, + args testArgs, +) { + var wg sync.WaitGroup + for range args.nParallel { + wg.Add(1) + go func() { + testSubmitLoopInner(t, args) + wg.Done() + }() + } + wg.Wait() +} + +func testSubmitLoopInner( + t *testing.T, + args testArgs, +) { + ctx, cancel := context.WithTimeout(context.Background(), args.testDuration) + defer cancel() + + // returns a duration in [0.8,1.2] * d + approx := func(d time.Duration) time.Duration { + base := int(float64(d) * 0.8) + factor := int(float64(d) * 0.4) + return time.Duration(base + rand.Intn(factor)) + } + + nProducedBytes := atomic.Uint64{} // tracking how many actual bytes have been produced but not submitted so far + producedBytesC := make(chan int) // producer sends on here, and can be blocked by not consuming from here + + // the time of the last block produced or the last batch submitted or the last starting of the node + timeLastProgress := atomic.Int64{} + + go func() { // simulate block production + go func() { // another thread to check system properties + for { + select { + case <-ctx.Done(): + return + default: + } + // producer shall not get too far ahead + absoluteMax := (args.batchSkew + 1) * args.batchBytes // +1 is because the producer is always blocked after the fact + require.True(t, nProducedBytes.Load() < absoluteMax) + } + }() + for { + select { + case <-ctx.Done(): + return + default: + } + time.Sleep(approx(args.produceTime)) + nBytes := rand.Intn(args.produceBytes) // simulate block production + nProducedBytes.Add(uint64(nBytes)) + producedBytesC <- nBytes + + timeLastProgress.Store(time.Now().Unix()) + } + }() + + submitBatch := func(maxSize uint64) (uint64, error) { // mock the batch submission + time.Sleep(approx(args.submitTime)) + if rand.Float64() < args.submissionHaltProbability { + time.Sleep(args.submissionHaltTime) + timeLastProgress.Store(time.Now().Unix()) // we have now recovered + } + consumed := rand.Intn(int(maxSize)) + nProducedBytes.Add(^uint64(consumed - 1)) // subtract + + timeLastProgressT := time.Unix(timeLastProgress.Load(), 0) + absoluteMax := int64(1.5 * float64(args.maxTime)) // allow some leeway for code execution + require.True(t, time.Since(timeLastProgressT).Milliseconds() < absoluteMax) + + timeLastProgress.Store(time.Now().Unix()) // we have submitted batch + return uint64(consumed), nil + } + + block.SubmitLoopInner( + ctx, + producedBytesC, + args.batchSkew, + args.maxTime, + args.batchBytes, + submitBatch, + ) +} + +// Make sure the producer does not get too far ahead +func TestSubmitLoopFastProducerHaltingSubmitter(t *testing.T) { + testSubmitLoop( + t, + testArgs{ + nParallel: 100, + testDuration: 2 * time.Second, + batchSkew: 10, + batchBytes: 100, + maxTime: 10 * time.Millisecond, + submitTime: 2 * time.Millisecond, + produceBytes: 20, + produceTime: 2 * time.Millisecond, + // a relatively long possibility of the submitter halting + // tests the case where we need to stop the producer getting too far ahead + submissionHaltTime: 50 * time.Millisecond, + submissionHaltProbability: 0.01, + }, + ) +} + +// Make sure the timer works even if the producer is slow +func TestSubmitLoopTimer(t *testing.T) { + testSubmitLoop( + t, + testArgs{ + nParallel: 100, + testDuration: 2 * time.Second, + batchSkew: 10, + batchBytes: 100, + maxTime: 10 * time.Millisecond, + submitTime: 2 * time.Millisecond, + produceBytes: 20, + // a relatively long production time ensures we test the + // case where the producer is slow but we want to submit anyway due to time + produceTime: 50 * time.Millisecond, + }, + ) +} diff --git a/block/submit_test.go b/block/submit_test.go index ef5e24a75..37596fff8 100644 --- a/block/submit_test.go +++ b/block/submit_test.go @@ -76,10 +76,8 @@ func TestBatchOverhead(t *testing.T) { commit := commits[0] batch := types.Batch{ - StartHeight: 1, - EndHeight: 1, - Blocks: blocks, - Commits: commits, + Blocks: blocks, + Commits: commits, } batchSize := batch.ToProto().Size() @@ -124,7 +122,7 @@ func TestBatchSubmissionHappyFlow(t *testing.T) { assert.Zero(t, manager.LastSubmittedHeight.Load()) // submit and validate sync target - manager.HandleSubmissionTrigger() + manager.CreateAndSubmitBatch(manager.Conf.BatchMaxSizeBytes) assert.EqualValues(t, manager.State.Height(), manager.LastSubmittedHeight.Load()) } @@ -172,11 +170,12 @@ func TestBatchSubmissionFailedSubmission(t *testing.T) { // try to submit, we expect failure slmock.On("SubmitBatch", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("submit batch")).Once() - assert.Error(t, manager.HandleSubmissionTrigger()) + _, err = manager.CreateAndSubmitBatch(manager.Conf.BatchMaxSizeBytes) + assert.Error(t, err) // try to submit again, we expect success slmock.On("SubmitBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() - manager.HandleSubmissionTrigger() + manager.CreateAndSubmitBatch(manager.Conf.BatchMaxSizeBytes) assert.EqualValues(t, manager.State.Height(), manager.LastSubmittedHeight.Load()) } @@ -198,11 +197,11 @@ func TestSubmissionByTime(t *testing.T) { // Init manager with empty blocks feature enabled managerConfig := config.BlockManagerConfig{ - BlockTime: blockTime, - MaxIdleTime: 0, - MaxSupportedBatchSkew: 10, - BatchSubmitMaxTime: submitTimeout, - BlockBatchMaxSizeBytes: 1000, + BlockTime: blockTime, + MaxIdleTime: 0, + MaxBatchSkew: 10, + BatchSubmitMaxTime: submitTimeout, + BatchMaxSizeBytes: 1000, } manager, err := testutil.GetManager(managerConfig, nil, nil, 1, 1, 0, proxyApp, nil) @@ -219,13 +218,14 @@ func TestSubmissionByTime(t *testing.T) { wg.Add(2) // Add 2 because we have 2 goroutines + bytesProducedC := make(chan int) go func() { - manager.ProduceBlockLoop(mCtx) + manager.ProduceBlockLoop(mCtx, bytesProducedC) wg.Done() // Decrease counter when this goroutine finishes }() go func() { - manager.SubmitLoop(mCtx) + manager.SubmitLoop(mCtx, bytesProducedC) wg.Done() // Decrease counter when this goroutine finishes }() @@ -257,7 +257,7 @@ func TestSubmissionByBatchSize(t *testing.T) { for _, c := range cases { managerConfig := testutil.GetManagerConfig() - managerConfig.BlockBatchMaxSizeBytes = c.blockBatchMaxSizeBytes + managerConfig.BatchMaxSizeBytes = c.blockBatchMaxSizeBytes manager, err := testutil.GetManager(managerConfig, nil, nil, 1, 1, 0, nil, nil) require.NoError(err) @@ -274,14 +274,16 @@ func submissionByBatchSize(manager *block.Manager, assert *assert.Assertions, ex ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() + bytesProducedC := make(chan int) + go func() { - manager.ProduceBlockLoop(ctx) + manager.ProduceBlockLoop(ctx, bytesProducedC) wg.Done() // Decrease counter when this goroutine finishes }() go func() { assert.Zero(manager.LastSubmittedHeight.Load()) - manager.SubmitLoop(ctx) + manager.SubmitLoop(ctx, bytesProducedC) wg.Done() // Decrease counter when this goroutine finishes }() diff --git a/config/config.go b/config/config.go index c643f9344..6efa11e6d 100644 --- a/config/config.go +++ b/config/config.go @@ -51,12 +51,12 @@ type BlockManagerConfig struct { MaxIdleTime time.Duration `mapstructure:"max_idle_time"` // MaxProofTime defines the max time to be idle, if txs that requires proof were included in last block MaxProofTime time.Duration `mapstructure:"max_proof_time"` - // BatchSubmitMaxTime defines how long should block manager wait for before submitting batch + // BatchSubmitMaxTime is how long should block manager wait for before submitting batch BatchSubmitMaxTime time.Duration `mapstructure:"batch_submit_max_time"` - // Max amount of pending batches to be submitted. block production will be paused if this limit is reached. - MaxSupportedBatchSkew uint64 `mapstructure:"max_supported_batch_skew"` - // The size of the batch in Bytes. Every batch we'll write to the DA and the settlement layer. - BlockBatchMaxSizeBytes uint64 `mapstructure:"block_batch_max_size_bytes"` + // MaxBatchSkew is the number of batches which are waiting to be submitted. Block production will be paused if this limit is reached. + MaxBatchSkew uint64 `mapstructure:"max_supported_batch_skew"` + // The size of the batch of blocks and commits in Bytes. We'll write every batch to the DA and the settlement layer. + BatchMaxSizeBytes uint64 `mapstructure:"block_batch_max_size_bytes"` } // GetViperConfig reads configuration parameters from Viper instance. @@ -153,11 +153,11 @@ func (c BlockManagerConfig) Validate() error { return fmt.Errorf("batch_submit_max_time must be greater than max_idle_time") } - if c.BlockBatchMaxSizeBytes <= 0 { + if c.BatchMaxSizeBytes <= 0 { return fmt.Errorf("block_batch_size_bytes must be positive") } - if c.MaxSupportedBatchSkew <= 0 { + if c.MaxBatchSkew <= 0 { return fmt.Errorf("max_supported_batch_skew must be positive") } diff --git a/config/config_test.go b/config/config_test.go index c94098b35..8b34b2551 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -35,7 +35,7 @@ func TestViperAndCobra(t *testing.T) { assert.Equal("foobar", nc.DALayer) assert.Equal(`{"json":true}`, nc.DAConfig) assert.Equal(1234*time.Second, nc.BlockTime) - assert.Equal(uint64(1000), nc.BlockManagerConfig.BlockBatchMaxSizeBytes) + assert.Equal(uint64(1000), nc.BlockManagerConfig.BatchMaxSizeBytes) } func TestNodeConfig_Validate(t *testing.T) { @@ -84,7 +84,7 @@ func TestNodeConfig_Validate(t *testing.T) { }, { name: "missing block batch max size bytes", malleate: func(nc *config.NodeConfig) { - nc.BlockManagerConfig.BlockBatchMaxSizeBytes = 0 + nc.BlockManagerConfig.BatchMaxSizeBytes = 0 }, wantErr: assert.Error, }, { @@ -185,12 +185,12 @@ func TestNodeConfig_Validate(t *testing.T) { func fullNodeConfig() config.NodeConfig { return config.NodeConfig{ BlockManagerConfig: config.BlockManagerConfig{ - BlockTime: 1 * time.Second, - MaxIdleTime: 20 * time.Second, - MaxProofTime: 20 * time.Second, - BatchSubmitMaxTime: 20 * time.Second, - MaxSupportedBatchSkew: 10, - BlockBatchMaxSizeBytes: 10000, + BlockTime: 1 * time.Second, + MaxIdleTime: 20 * time.Second, + MaxProofTime: 20 * time.Second, + BatchSubmitMaxTime: 20 * time.Second, + MaxBatchSkew: 10, + BatchMaxSizeBytes: 10000, }, DALayer: "celestia", DAConfig: "da-config", diff --git a/config/defaults.go b/config/defaults.go index 516f3155d..c0c9ec16b 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -23,12 +23,12 @@ var DefaultNodeConfig = *DefaultConfig("", "") func DefaultConfig(home, chainId string) *NodeConfig { cfg := &NodeConfig{ BlockManagerConfig: BlockManagerConfig{ - BlockTime: 200 * time.Millisecond, - MaxIdleTime: 3600 * time.Second, - MaxProofTime: 100 * time.Second, - BatchSubmitMaxTime: 3600 * time.Second, - MaxSupportedBatchSkew: 20, - BlockBatchMaxSizeBytes: 500000, + BlockTime: 200 * time.Millisecond, + MaxIdleTime: 3600 * time.Second, + MaxProofTime: 100 * time.Second, + BatchSubmitMaxTime: 3600 * time.Second, + MaxBatchSkew: 20, + BatchMaxSizeBytes: 500000, }, DALayer: "mock", SettlementLayer: "mock", diff --git a/config/flags.go b/config/flags.go index 816563ab3..d9bc0a3d7 100644 --- a/config/flags.go +++ b/config/flags.go @@ -48,7 +48,7 @@ func AddNodeFlags(cmd *cobra.Command) { cmd.Flags().Duration(FlagBlockTime, def.BlockTime, "block time (for sequencer mode)") cmd.Flags().Duration(FlagMaxIdleTime, def.MaxIdleTime, "max time for empty blocks (for sequencer mode)") cmd.Flags().Duration(FlagBatchSubmitMaxTime, def.BatchSubmitMaxTime, "max time for batch submit (for sequencer mode)") - cmd.Flags().Uint64(FlagBlockBatchMaxSizeBytes, def.BlockBatchMaxSizeBytes, "block batch size in bytes") + cmd.Flags().Uint64(FlagBlockBatchMaxSizeBytes, def.BatchMaxSizeBytes, "block batch size in bytes") cmd.Flags().String(FlagSettlementLayer, def.SettlementLayer, "Settlement Layer Client name") cmd.Flags().String(FlagSLNodeAddress, def.SettlementConfig.NodeAddress, "Settlement Layer RPC node address") cmd.Flags().String(FlagSLKeyringBackend, def.SettlementConfig.KeyringBackend, "Sequencer keyring backend") diff --git a/config/toml.go b/config/toml.go index 43af9946c..7c3236eab 100644 --- a/config/toml.go +++ b/config/toml.go @@ -70,14 +70,14 @@ block_time = "{{ .BlockManagerConfig.BlockTime }}" # block production interval in case of no transactions ("0s" produces empty blocks) max_idle_time = "{{ .BlockManagerConfig.MaxIdleTime }}" max_proof_time = "{{ .BlockManagerConfig.MaxProofTime }}" -max_supported_batch_skew = {{ .BlockManagerConfig.MaxSupportedBatchSkew }} +max_supported_batch_skew = {{ .BlockManagerConfig.MaxBatchSkew }} # triggers to submit batch to DA and settlement (both required) batch_submit_max_time = "{{ .BlockManagerConfig.BatchSubmitMaxTime }}" # max size of batch in bytes that can be accepted by DA -block_batch_max_size_bytes = {{ .BlockManagerConfig.BlockBatchMaxSizeBytes }} +block_batch_max_size_bytes = {{ .BlockManagerConfig.BatchMaxSizeBytes }} ### da config ### da_layer = "{{ .DALayer }}" # mock, celestia, avail diff --git a/da/avail/avail_test.go b/da/avail/avail_test.go index 5cac5df35..26097a853 100644 --- a/da/avail/avail_test.go +++ b/da/avail/avail_test.go @@ -9,7 +9,6 @@ import ( "github.com/dymensionxyz/dymint/da/avail" mocks "github.com/dymensionxyz/dymint/mocks/github.com/dymensionxyz/dymint/da/avail" "github.com/dymensionxyz/dymint/testutil" - "github.com/dymensionxyz/dymint/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -20,103 +19,16 @@ const ( seed = "copper mother insect grunt blue cute tell side welcome domain border oxygen" ) -// FIXME(omritoptix): This test is currently not working as I couldn't find a way to mock the SubmitAndWatchExtrinsic function. -// func TestSubmitBatch(t *testing.T) { -// assert := assert.New(t) -// require := require.New(t) -// configBytes, err := json.Marshal(avail.Config{ -// Seed: seed, -// }) -// require.NoError(err) -// // Create mock clients -// mockSubstrateApiClient := mocks.NewSubstrateApiI(t) -// // Configure DALC options -// options := []da.Option{ -// avail.WithClient(mockSubstrateApiClient), -// avail.WithBatchRetryAttempts(1), -// avail.WithBatchRetryDelay(1 * time.Second), -// avail.WithTxInclusionTimeout(1 * time.Second), -// } -// // Subscribe to the health status event -// pubsubServer := pubsub.NewServer() -// pubsubServer.Start() -// // HealthSubscription, err := pubsubServer.Subscribe(context.Background(), "testSubmitBatch", da.EventQueryDAHealthStatus) -// assert.NoError(err) -// // Start the DALC -// dalc := avail.DataAvailabilityLayerClient{} -// err = dalc.Init(configBytes, pubsubServer, nil, test.NewLogger(t), options...) -// require.NoError(err) -// err = dalc.Start() -// require.NoError(err) -// // Set the mock functions -// metadata := availtypes.NewMetadataV14() -// metadata.AsMetadataV14 = availtypes.MetadataV14{ -// Pallets: []availtypes.PalletMetadataV14{ -// { -// Name: "DataAvailability", -// HasCalls: true, -// }, -// { -// Name: "System", -// HasStorage: true, -// Storage: availtypes.StorageMetadataV14{ -// Prefix: "System", -// Items: []availtypes.StorageEntryMetadataV14{ -// { -// Name: "Account", -// Type: availtypes.StorageEntryTypeV14{ -// IsPlainType: true, -// IsMap: true, -// AsMap: availtypes.MapTypeV14{ -// Hashers: []availtypes.StorageHasherV10{ -// { -// IsIdentity: true, -// }, -// }, -// }, -// }, -// }, -// }, -// }, -// }, -// }, -// EfficientLookup: map[int64]*availtypes.Si1Type{ -// 0: { -// Def: availtypes.Si1TypeDef{ -// Variant: availtypes.Si1TypeDefVariant{ -// Variants: []availtypes.Si1Variant{ -// { -// Name: "submit_data", -// }, -// }, -// }, -// }, -// }, -// }, -// } +// TODO: there was another test here, it should be brought back https://github.com/dymensionxyz/dymint/issues/970 -// mockSubstrateApiClient.On("GetMetadataLatest").Return(metadata, nil) -// mockSubstrateApiClient.On("GetBlockHash", mock.Anything).Return(availtypes.NewHash([]byte("123")), nil) -// mockSubstrateApiClient.On("GetRuntimeVersionLatest").Return(availtypes.NewRuntimeVersion(), nil) -// mockSubstrateApiClient.On("GetStorageLatest", mock.Anything, mock.Anything).Return(true, nil) -// mockSubstrateApiClient.On("SubmitAndWatchExtrinsic", mock.Anything).Return(nil, nil) -// batch := &types.Batch{ -// StartHeight: 0, -// EndHeight: 1, -// } -// res := dalc.SubmitBatch(batch) -// assert.Equal(res.Code, da.StatusSuccess) - -// } - -// TestRetriveBatches tests the RetrieveBatches function manages +// TestRetrieveBatches tests the RetrieveBatches function manages // to decode the batches from the block extrinsics and only returns // the batches relevant for our app id and method index. -func TestRetriveBatches(t *testing.T) { +func TestRetrieveBatches(t *testing.T) { assert := assert.New(t) require := require.New(t) const appId = 123 - // Setup the config + // Set up the config configBytes, err := json.Marshal(avail.Config{ Seed: seed, AppID: int64(appId), @@ -140,8 +52,8 @@ func TestRetriveBatches(t *testing.T) { // Set the mock functions mockSubstrateApiClient.On("GetBlockHash", mock.Anything).Return(availtypes.NewHash([]byte("123")), nil) // Build batches for the block extrinsics - batch1 := types.Batch{StartHeight: 0, EndHeight: 1} - batch2 := types.Batch{StartHeight: 2, EndHeight: 3} + batch1 := testutil.MustGenerateBatchAndKey(0, 1) + batch2 := testutil.MustGenerateBatchAndKey(2, 3) batch1bytes, err := batch1.MarshalBinary() require.NoError(err) batch2bytes, err := batch2.MarshalBinary() @@ -181,5 +93,5 @@ func TestRetriveBatches(t *testing.T) { } batchResult := dalc.RetrieveBatches(daMetaData) assert.Equal(1, len(batchResult.Batches)) - assert.Equal(batch1.StartHeight, batchResult.Batches[0].StartHeight) + assert.Equal(batch1.StartHeight(), batchResult.Batches[0].StartHeight()) } diff --git a/da/celestia/celestia_test.go b/da/celestia/celestia_test.go index a1777a10b..862832e3b 100644 --- a/da/celestia/celestia_test.go +++ b/da/celestia/celestia_test.go @@ -40,14 +40,10 @@ func TestDALC(t *testing.T) { block1 := getRandomBlock(1, 10) block2 := getRandomBlock(2, 10) batch1 := &types.Batch{ - StartHeight: block1.Header.Height, - EndHeight: block1.Header.Height, - Blocks: []*types.Block{block1}, + Blocks: []*types.Block{block1}, } batch2 := &types.Batch{ - StartHeight: block2.Header.Height, - EndHeight: block2.Header.Height, - Blocks: []*types.Block{block2}, + Blocks: []*types.Block{block2}, } nIDSize := 1 @@ -102,9 +98,7 @@ func TestRetrievalNotFound(t *testing.T) { // only blocks b1 and b2 will be submitted to DA block1 := getRandomBlock(1, 10) batch1 := &types.Batch{ - StartHeight: block1.Header.Height, - EndHeight: block1.Header.Height, - Blocks: []*types.Block{block1}, + Blocks: []*types.Block{block1}, } nIDSize := 1 @@ -146,9 +140,7 @@ func TestRetrievalNoCommitment(t *testing.T) { mockRPCClient, dalc, nID, _ := setDAandMock(t) block1 := getRandomBlock(1, 10) batch1 := &types.Batch{ - StartHeight: block1.Header.Height, - EndHeight: block1.Header.Height, - Blocks: []*types.Block{block1}, + Blocks: []*types.Block{block1}, } // only blocks b1 and b2 will be submitted to DA data1, _ := batch1.MarshalBinary() @@ -175,9 +167,7 @@ func TestAvalabilityOK(t *testing.T) { // only blocks b1 and b2 will be submitted to DA block1 := getRandomBlock(1, 10) batch1 := &types.Batch{ - StartHeight: block1.Header.Height, - EndHeight: block1.Header.Height, - Blocks: []*types.Block{block1}, + Blocks: []*types.Block{block1}, } nIDSize := 1 @@ -219,9 +209,7 @@ func TestAvalabilityWrongProof(t *testing.T) { // only blocks b1 and b2 will be submitted to DA block1 := getRandomBlock(1, 10) batch1 := &types.Batch{ - StartHeight: block1.Header.Height, - EndHeight: block1.Header.Height, - Blocks: []*types.Block{block1}, + Blocks: []*types.Block{block1}, } nIDSize := 1 @@ -345,8 +333,8 @@ func compareBlocks(t *testing.T, b1, b2 *types.Block) { func compareBatches(t *testing.T, b1, b2 *types.Batch) { t.Helper() - assert.Equal(t, b1.StartHeight, b2.StartHeight) - assert.Equal(t, b1.EndHeight, b2.EndHeight) + assert.Equal(t, b1.StartHeight(), b2.StartHeight()) + assert.Equal(t, b1.EndHeight(), b2.EndHeight()) assert.Equal(t, len(b1.Blocks), len(b2.Blocks)) for i := range b1.Blocks { compareBlocks(t, b1.Blocks[i], b2.Blocks[i]) diff --git a/da/celestia/rpc_test.go b/da/celestia/rpc_test.go index 7b854cc9f..7b6f9eee6 100644 --- a/da/celestia/rpc_test.go +++ b/da/celestia/rpc_test.go @@ -17,7 +17,6 @@ import ( "github.com/dymensionxyz/dymint/da/celestia" mocks "github.com/dymensionxyz/dymint/mocks/github.com/dymensionxyz/dymint/da/celestia/types" "github.com/dymensionxyz/dymint/testutil" - "github.com/dymensionxyz/dymint/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -53,10 +52,7 @@ func TestSubmitBatch(t *testing.T) { require := require.New(t) configBytes, err := json.Marshal(celestia.TestConfig) require.NoError(err) - batch := &types.Batch{ - StartHeight: 0, - EndHeight: 1, - } + batch := testutil.MustGenerateBatchAndKey(0, 1) nIDSize := 1 tree := exampleNMT(nIDSize, true, 1, 2, 3, 4) @@ -71,7 +67,7 @@ func TestSubmitBatch(t *testing.T) { cases := []struct { name string submitPFBReturn []interface{} - sumbitPFDRun func(args mock.Arguments) + submitPFBRun func(args mock.Arguments) expectedInclusionHeight uint64 getProofReturn []interface{} getProofDRun func(args mock.Arguments) @@ -83,7 +79,7 @@ func TestSubmitBatch(t *testing.T) { submitPFBReturn: []interface{}{uint64(1234), nil}, getProofReturn: []interface{}{&blobProof, nil}, includedReturn: []interface{}{true, nil}, - sumbitPFDRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) }, + submitPFBRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) }, getProofDRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) }, includedRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) }, expectedInclusionHeight: uint64(1234), @@ -93,7 +89,7 @@ func TestSubmitBatch(t *testing.T) { submitPFBReturn: []interface{}{uint64(0), timeOutErr}, getProofReturn: []interface{}{&blobProof, nil}, includedReturn: []interface{}{true, nil}, - sumbitPFDRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) }, + submitPFBRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) }, getProofDRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) }, includedRun: func(args mock.Arguments) { time.Sleep(10 * time.Millisecond) }, }, @@ -128,7 +124,7 @@ func TestSubmitBatch(t *testing.T) { DAH: dah, } - mockRPCClient.On(submitPFBFuncName, mock.Anything, mock.Anything, mock.Anything).Return(tc.submitPFBReturn...).Run(tc.sumbitPFDRun) + mockRPCClient.On(submitPFBFuncName, mock.Anything, mock.Anything, mock.Anything).Return(tc.submitPFBReturn...).Run(tc.submitPFBRun) if tc.name == "TestSubmitPFBResponseCodeSuccess" { mockRPCClient.On(getProofFuncName, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.getProofReturn...).Run(tc.getProofDRun) mockRPCClient.On(includedFuncName, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.includedReturn...).Run(tc.includedRun) diff --git a/da/da_test.go b/da/da_test.go index 50aa4c665..ac7f084da 100644 --- a/da/da_test.go +++ b/da/da_test.go @@ -82,14 +82,10 @@ func doTestDALC(t *testing.T, mockDalc da.DataAvailabilityLayerClient) { block1 := getRandomBlock(1, 10) block2 := getRandomBlock(2, 10) batch1 := &types.Batch{ - StartHeight: block1.Header.Height, - EndHeight: block1.Header.Height, - Blocks: []*types.Block{block1}, + Blocks: []*types.Block{block1}, } batch2 := &types.Batch{ - StartHeight: block2.Header.Height, - EndHeight: block2.Header.Height, - Blocks: []*types.Block{block2}, + Blocks: []*types.Block{block2}, } resp := dalc.SubmitBatch(batch1) @@ -166,9 +162,7 @@ func doTestRetrieve(t *testing.T, dalc da.DataAvailabilityLayerClient) { for i := uint64(0); i < 100; i++ { b := getRandomBlock(i, rand.Int()%20) batch := &types.Batch{ - StartHeight: i, - EndHeight: i, - Blocks: []*types.Block{b}, + Blocks: []*types.Block{b}, Commits: []*types.Commit{ { Height: b.Header.Height, diff --git a/da/local/local.go b/da/local/local.go index f4b99318b..8c7a2f381 100644 --- a/da/local/local.go +++ b/da/local/local.go @@ -89,14 +89,14 @@ func (m *DataAvailabilityLayerClient) GetClientType() da.Client { func (m *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultSubmitBatch { daHeight := m.daHeight.Load() - m.logger.Debug("Submitting batch to DA layer", "start height", batch.StartHeight, "end height", batch.EndHeight, "da height", daHeight) + m.logger.Debug("Submitting batch to DA layer", "start height", batch.StartHeight(), "end height", batch.EndHeight(), "da height", daHeight) blob, err := batch.MarshalBinary() if err != nil { return da.ResultSubmitBatch{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error(), Error: err}} } - hash := sha1.Sum(uint64ToBinary(batch.EndHeight)) //#nosec - err = m.dalcKV.Set(getKey(daHeight, batch.StartHeight), hash[:]) + hash := sha1.Sum(uint64ToBinary(batch.EndHeight())) //#nosec + err = m.dalcKV.Set(getKey(daHeight, batch.StartHeight()), hash[:]) if err != nil { return da.ResultSubmitBatch{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error(), Error: err}} } diff --git a/node/node_test.go b/node/node_test.go index 448ba7ff6..3706ccb47 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -73,10 +73,10 @@ func TestMempoolDirectly(t *testing.T) { RPC: config.RPCConfig{}, MempoolConfig: *tmcfg.DefaultMempoolConfig(), BlockManagerConfig: config.BlockManagerConfig{ - BlockTime: 1 * time.Second, - BatchSubmitMaxTime: 60 * time.Second, - BlockBatchMaxSizeBytes: 100000, - MaxSupportedBatchSkew: 10, + BlockTime: 1 * time.Second, + BatchSubmitMaxTime: 60 * time.Second, + BatchMaxSizeBytes: 100000, + MaxBatchSkew: 10, }, DALayer: "mock", DAConfig: "", diff --git a/rpc/client/client_test.go b/rpc/client/client_test.go index 8a4d72c6e..46db61a05 100644 --- a/rpc/client/client_test.go +++ b/rpc/client/client_test.go @@ -110,10 +110,10 @@ func TestGenesisChunked(t *testing.T) { }, RPC: config.RPCConfig{}, BlockManagerConfig: config.BlockManagerConfig{ - BlockTime: 100 * time.Millisecond, - BatchSubmitMaxTime: 60 * time.Second, - BlockBatchMaxSizeBytes: 1000, - MaxSupportedBatchSkew: 10, + BlockTime: 100 * time.Millisecond, + BatchSubmitMaxTime: 60 * time.Second, + BatchMaxSizeBytes: 1000, + MaxBatchSkew: 10, }, DALayer: "mock", DAConfig: "", @@ -710,10 +710,10 @@ func TestValidatorSetHandling(t *testing.T) { BootstrapRetryTime: 30 * time.Second, }, BlockManagerConfig: config.BlockManagerConfig{ - BlockTime: 10 * time.Millisecond, - BatchSubmitMaxTime: 60 * time.Second, - BlockBatchMaxSizeBytes: 1000, - MaxSupportedBatchSkew: 10, + BlockTime: 10 * time.Millisecond, + BatchSubmitMaxTime: 60 * time.Second, + BatchMaxSizeBytes: 1000, + MaxBatchSkew: 10, }, SettlementConfig: settlement.Config{ ProposerPubKey: hex.EncodeToString(proposerPubKeyBytes), @@ -869,10 +869,10 @@ func getRPCInternal(t *testing.T, sequencer bool) (*tmmocks.MockApplication, *cl RPC: config.RPCConfig{}, MempoolConfig: *tmcfg.DefaultMempoolConfig(), BlockManagerConfig: config.BlockManagerConfig{ - BlockTime: 100 * time.Millisecond, - BatchSubmitMaxTime: 60 * time.Second, - BlockBatchMaxSizeBytes: 1000, - MaxSupportedBatchSkew: 10, + BlockTime: 100 * time.Millisecond, + BatchSubmitMaxTime: 60 * time.Second, + BatchMaxSizeBytes: 1000, + MaxBatchSkew: 10, }, DALayer: "mock", DAConfig: "", @@ -978,10 +978,10 @@ func TestMempool2Nodes(t *testing.T) { BootstrapRetryTime: 30 * time.Second, }, BlockManagerConfig: config.BlockManagerConfig{ - BlockTime: 100 * time.Millisecond, - BatchSubmitMaxTime: 60 * time.Second, - BlockBatchMaxSizeBytes: 1000, - MaxSupportedBatchSkew: 10, + BlockTime: 100 * time.Millisecond, + BatchSubmitMaxTime: 60 * time.Second, + BatchMaxSizeBytes: 1000, + MaxBatchSkew: 10, }, MempoolConfig: *tmcfg.DefaultMempoolConfig(), }, key1, signingKey1, proxy.NewLocalClientCreator(app), &tmtypes.GenesisDoc{ChainID: rollappID}, log.TestingLogger(), mempool.NopMetrics()) @@ -996,10 +996,10 @@ func TestMempool2Nodes(t *testing.T) { RollappID: rollappID, }, BlockManagerConfig: config.BlockManagerConfig{ - BlockTime: 100 * time.Millisecond, - BatchSubmitMaxTime: 60 * time.Second, - BlockBatchMaxSizeBytes: 1000, - MaxSupportedBatchSkew: 10, + BlockTime: 100 * time.Millisecond, + BatchSubmitMaxTime: 60 * time.Second, + BatchMaxSizeBytes: 1000, + MaxBatchSkew: 10, }, P2PConfig: config.P2PConfig{ ListenAddress: "/ip4/127.0.0.1/tcp/9002", diff --git a/rpc/json/service_test.go b/rpc/json/service_test.go index 88bbe3a8d..e3c52dfd6 100644 --- a/rpc/json/service_test.go +++ b/rpc/json/service_test.go @@ -302,11 +302,11 @@ func getRPC(t *testing.T) (*tmmocks.MockApplication, *client.Client) { config := config.NodeConfig{ DALayer: "mock", SettlementLayer: "mock", BlockManagerConfig: config.BlockManagerConfig{ - BlockTime: 1 * time.Second, - MaxIdleTime: 0, - MaxSupportedBatchSkew: 10, - BatchSubmitMaxTime: 30 * time.Minute, - BlockBatchMaxSizeBytes: 1000, + BlockTime: 1 * time.Second, + MaxIdleTime: 0, + MaxBatchSkew: 10, + BatchSubmitMaxTime: 30 * time.Minute, + BatchMaxSizeBytes: 1000, }, SettlementConfig: settlement.Config{ ProposerPubKey: hex.EncodeToString(proposerPubKeyBytes), diff --git a/settlement/dymension/dymension.go b/settlement/dymension/dymension.go index 4411611ec..da83717f2 100644 --- a/settlement/dymension/dymension.go +++ b/settlement/dymension/dymension.go @@ -143,7 +143,7 @@ func (c *Client) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *d } // TODO: probably should be changed to be a channel, as the eventHandler is also in the HubClient in he produces the event - postBatchSubscriberClient := fmt.Sprintf("%s-%d-%s", postBatchSubscriberPrefix, batch.StartHeight, uuid.New().String()) + postBatchSubscriberClient := fmt.Sprintf("%s-%d-%s", postBatchSubscriberPrefix, batch.StartHeight(), uuid.New().String()) subscription, err := c.pubsub.Subscribe(c.ctx, postBatchSubscriberClient, settlement.EventQueryNewSettlementBatchAccepted, 1000) if err != nil { return fmt.Errorf("pub sub subscribe to settlement state updates: %w", err) @@ -164,7 +164,7 @@ func (c *Client) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *d c.logger.Error( "Submit batch", "startHeight", - batch.StartHeight, + batch.StartHeight(), "endHeight", batch.EndHeight, "error", @@ -176,7 +176,7 @@ func (c *Client) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *d if err != nil { // this could happen if we timed-out waiting for acceptance in the previous iteration, but the batch was indeed submitted if errors.Is(err, gerrc.ErrAlreadyExists) { - c.logger.Debug("Batch already accepted", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight) + c.logger.Debug("Batch already accepted", "startHeight", batch.StartHeight(), "endHeight", batch.EndHeight()) return nil } return fmt.Errorf("broadcast batch: %w", err) @@ -197,16 +197,16 @@ func (c *Client) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *d case event := <-subscription.Out(): eventData, _ := event.Data().(*settlement.EventDataNewBatchAccepted) - if eventData.EndHeight != batch.EndHeight { + if eventData.EndHeight != batch.EndHeight() { c.logger.Debug("Received event for a different batch, ignoring.", "event", eventData) continue // continue waiting for acceptance of the current batch } - c.logger.Info("Batch accepted.", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight, "stateIndex", eventData.StateIndex) + c.logger.Info("Batch accepted.", "startHeight", batch.StartHeight(), "endHeight", batch.EndHeight(), "stateIndex", eventData.StateIndex) return nil case <-timer.C: // Check if the batch was accepted by the settlement layer, and we've just missed the event. - includedBatch, err := c.pollForBatchInclusion(batch.EndHeight) + includedBatch, err := c.pollForBatchInclusion(batch.EndHeight()) timer.Reset(c.batchAcceptanceTimeout) // no error, but still not included if err == nil && !includedBatch { @@ -217,9 +217,9 @@ func (c *Client) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *d c.logger.Error( "Timed out waiting for batch inclusion on settlement layer", "startHeight", - batch.StartHeight, + batch.StartHeight(), "endHeight", - batch.EndHeight, + batch.EndHeight(), ) break // breaks the switch case, and goes back to the broadcast loop } @@ -227,16 +227,16 @@ func (c *Client) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *d c.logger.Error( "Wait for batch inclusion", "startHeight", - batch.StartHeight, + batch.StartHeight(), "endHeight", - batch.EndHeight, + batch.EndHeight(), "error", err, ) continue // continue waiting for acceptance of the current batch } // all good - c.logger.Info("Batch accepted", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight) + c.logger.Info("Batch accepted", "startHeight", batch.StartHeight(), "endHeight", batch.EndHeight()) return nil } break // failed waiting for acceptance. broadcast the batch again @@ -431,8 +431,8 @@ func (c *Client) convertBatchToMsgUpdateState(batch *types.Batch, daResult *da.R settlementBatch := &rollapptypes.MsgUpdateState{ Creator: addr, RollappId: c.config.RollappID, - StartHeight: batch.StartHeight, - NumBlocks: batch.EndHeight - batch.StartHeight + 1, + StartHeight: batch.StartHeight(), + NumBlocks: batch.NumBlocks(), DAPath: daResult.SubmitMetaData.ToPath(), Version: dymRollappVersion, BDs: rollapptypes.BlockDescriptors{BD: blockDescriptors}, diff --git a/settlement/dymension/dymension_test.go b/settlement/dymension/dymension_test.go index 8e78ec2e1..bd7d783f4 100644 --- a/settlement/dymension/dymension_test.go +++ b/settlement/dymension/dymension_test.go @@ -236,7 +236,7 @@ func TestPostBatch(t *testing.T) { } rollappQueryClientMock.On("StateInfo", mock.Anything, mock.Anything).Return( &rollapptypes.QueryGetStateInfoResponse{StateInfo: rollapptypes.StateInfo{ - StartHeight: batch.StartHeight, StateInfoIndex: rollapptypes.StateInfoIndex{Index: 1}, DAPath: daMetaData.ToPath(), NumBlocks: 1, + StartHeight: batch.StartHeight(), StateInfoIndex: rollapptypes.StateInfoIndex{Index: 1}, DAPath: daMetaData.ToPath(), NumBlocks: 1, }}, nil) } else { diff --git a/settlement/grpc/grpc.go b/settlement/grpc/grpc.go index 603d45d17..3b1005ed8 100644 --- a/settlement/grpc/grpc.go +++ b/settlement/grpc/grpc.go @@ -173,7 +173,7 @@ func (c *Client) Stop() error { return nil } -// PostBatch saves the batch to the kv store +// SubmitBatch saves the batch to the kv store func (c *Client) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) error { settlementBatch := c.convertBatchtoSettlementBatch(batch, daResult) err := c.saveBatch(settlementBatch) @@ -263,8 +263,8 @@ func (c *Client) saveBatch(batch *settlement.Batch) error { func (c *Client) convertBatchtoSettlementBatch(batch *types.Batch, daResult *da.ResultSubmitBatch) *settlement.Batch { settlementBatch := &settlement.Batch{ - StartHeight: batch.StartHeight, - EndHeight: batch.EndHeight, + StartHeight: batch.StartHeight(), + EndHeight: batch.EndHeight(), MetaData: &settlement.BatchMetaData{ DA: &da.DASubmitMetaData{ Height: daResult.SubmitMetaData.Height, diff --git a/settlement/local/local.go b/settlement/local/local.go index 2c7a62c61..cff6c21ba 100644 --- a/settlement/local/local.go +++ b/settlement/local/local.go @@ -256,8 +256,8 @@ func (c *Client) retrieveBatchAtStateIndex(slStateIndex uint64) (*settlement.Res func convertBatchToSettlementBatch(batch *types.Batch, daResult *da.ResultSubmitBatch) *settlement.Batch { settlementBatch := &settlement.Batch{ - StartHeight: batch.StartHeight, - EndHeight: batch.EndHeight, + StartHeight: batch.StartHeight(), + EndHeight: batch.EndHeight(), MetaData: &settlement.BatchMetaData{ DA: &da.DASubmitMetaData{ Height: daResult.SubmitMetaData.Height, diff --git a/settlement/local/local_test.go b/settlement/local/local_test.go index 3268fa2c5..956416975 100644 --- a/settlement/local/local_test.go +++ b/settlement/local/local_test.go @@ -57,11 +57,11 @@ func TestSubmitBatch(t *testing.T) { require.Error(err) // no batch should be present // Create a batches which will be submitted - propserKey, _, err := crypto.GenerateEd25519Key(nil) + proposerKey, _, err := crypto.GenerateEd25519Key(nil) require.NoError(err) - batch1, err := testutil.GenerateBatch(1, 1, propserKey) + batch1, err := testutil.GenerateBatch(1, 1, proposerKey) require.NoError(err) - batch2, err := testutil.GenerateBatch(2, 2, propserKey) + batch2, err := testutil.GenerateBatch(2, 2, proposerKey) require.NoError(err) resultSubmitBatch := &da.ResultSubmitBatch{} resultSubmitBatch.SubmitMetaData = &da.DASubmitMetaData{} @@ -74,7 +74,7 @@ func TestSubmitBatch(t *testing.T) { // Check if the batch was submitted queriedBatch, err := sllayer.GetLatestBatch() require.NoError(err) - assert.Equal(batch1.EndHeight, queriedBatch.Batch.EndHeight) + assert.Equal(batch1.EndHeight(), queriedBatch.Batch.EndHeight) state, err := sllayer.GetHeightState(1) require.NoError(err) @@ -82,7 +82,7 @@ func TestSubmitBatch(t *testing.T) { queriedBatch, err = sllayer.GetBatchAtIndex(state.State.StateIndex) require.NoError(err) - assert.Equal(batch1.EndHeight, queriedBatch.Batch.EndHeight) + assert.Equal(batch1.EndHeight(), queriedBatch.Batch.EndHeight) // Submit the 2nd batch and check if it was successful err = sllayer.SubmitBatch(batch2, da.Mock, resultSubmitBatch) @@ -92,7 +92,7 @@ func TestSubmitBatch(t *testing.T) { // Check if the batch was submitted queriedBatch, err = sllayer.GetLatestBatch() require.NoError(err) - assert.Equal(batch2.EndHeight, queriedBatch.Batch.EndHeight) + assert.Equal(batch2.EndHeight(), queriedBatch.Batch.EndHeight) state, err = sllayer.GetHeightState(2) require.NoError(err) @@ -100,7 +100,7 @@ func TestSubmitBatch(t *testing.T) { queriedBatch, err = sllayer.GetBatchAtIndex(state.State.StateIndex) require.NoError(err) - assert.Equal(batch2.EndHeight, queriedBatch.Batch.EndHeight) + assert.Equal(batch2.EndHeight(), queriedBatch.Batch.EndHeight) // TODO: test event emitted } @@ -145,7 +145,7 @@ func TestPersistency(t *testing.T) { queriedBatch, err := sllayer.GetLatestBatch() require.NoError(err) - assert.Equal(batch1.EndHeight, queriedBatch.Batch.EndHeight) + assert.Equal(batch1.EndHeight(), queriedBatch.Batch.EndHeight) // Restart the layer and check if the batch is still present err = sllayer.Stop() @@ -154,5 +154,5 @@ func TestPersistency(t *testing.T) { _ = sllayer.Init(cfg, pubsubServer, logger) queriedBatch, err = sllayer.GetLatestBatch() require.NoError(err) - assert.Equal(batch1.EndHeight, queriedBatch.Batch.EndHeight) + assert.Equal(batch1.EndHeight(), queriedBatch.Batch.EndHeight) } diff --git a/testutil/block.go b/testutil/block.go index 43550e09c..54bbddb02 100644 --- a/testutil/block.go +++ b/testutil/block.go @@ -32,6 +32,7 @@ const ( /* -------------------------------------------------------------------------- */ /* utils */ /* -------------------------------------------------------------------------- */ + func GetManagerWithProposerKey(conf config.BlockManagerConfig, proposerKey crypto.PrivKey, settlementlc settlement.ClientI, dalc da.DataAvailabilityLayerClient, genesisHeight, storeInitialHeight, storeLastBlockHeight int64, proxyAppConns proxy.AppConns, mockStore store.Store) (*block.Manager, error) { genesis := GenerateGenesis(genesisHeight) // Change the LastBlockHeight to avoid calling InitChainSync within the manager @@ -146,9 +147,9 @@ func initSettlementLayerMock(settlementlc settlement.ClientI, proposer string, p func GetManagerConfig() config.BlockManagerConfig { return config.BlockManagerConfig{ - BlockTime: 100 * time.Millisecond, - BlockBatchMaxSizeBytes: 1000000, - BatchSubmitMaxTime: 30 * time.Minute, - MaxSupportedBatchSkew: 10, + BlockTime: 100 * time.Millisecond, + BatchMaxSizeBytes: 1000000, + BatchSubmitMaxTime: 30 * time.Minute, + MaxBatchSkew: 10, } } diff --git a/testutil/node.go b/testutil/node.go index 0b8b9853c..01e166b9a 100644 --- a/testutil/node.go +++ b/testutil/node.go @@ -28,10 +28,10 @@ func CreateNode(isSequencer bool, blockManagerConfig *config.BlockManagerConfig) if blockManagerConfig == nil { blockManagerConfig = &config.BlockManagerConfig{ - BlockTime: 100 * time.Millisecond, - BatchSubmitMaxTime: 60 * time.Second, - BlockBatchMaxSizeBytes: 1000, - MaxSupportedBatchSkew: 10, + BlockTime: 100 * time.Millisecond, + BatchSubmitMaxTime: 60 * time.Second, + BatchMaxSizeBytes: 1000, + MaxBatchSkew: 10, } } nodeConfig.BlockManagerConfig = *blockManagerConfig diff --git a/testutil/types.go b/testutil/types.go index 50ab642ee..6b0e04354 100644 --- a/testutil/types.go +++ b/testutil/types.go @@ -5,9 +5,8 @@ import ( "math/big" "time" - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/dymensionxyz/dymint/types" + "github.com/libp2p/go-libp2p/core/crypto" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto/ed25519" tmstate "github.com/tendermint/tendermint/proto/tendermint/state" @@ -173,14 +172,35 @@ func GenerateBatch(startHeight uint64, endHeight uint64, proposerKey crypto.Priv return nil, err } batch := &types.Batch{ - StartHeight: startHeight, - EndHeight: endHeight, - Blocks: blocks, - Commits: commits, + Blocks: blocks, + Commits: commits, } return batch, nil } +func MustGenerateBatch(startHeight uint64, endHeight uint64, proposerKey crypto.PrivKey) *types.Batch { + blocks, err := GenerateBlocks(startHeight, endHeight-startHeight+1, proposerKey) + if err != nil { + panic(err) + } + commits, err := GenerateCommits(blocks, proposerKey) + if err != nil { + panic(err) + } + return &types.Batch{ + Blocks: blocks, + Commits: commits, + } +} + +func MustGenerateBatchAndKey(startHeight uint64, endHeight uint64) *types.Batch { + proposerKey, _, err := crypto.GenerateEd25519Key(nil) + if err != nil { + panic(err) + } + return MustGenerateBatch(startHeight, endHeight, proposerKey) +} + // GenerateRandomValidatorSet generates random validator sets func GenerateRandomValidatorSet() *tmtypes.ValidatorSet { pubKey := ed25519.GenPrivKey().PubKey() diff --git a/types/batch.go b/types/batch.go index 0611fb7a7..90ac90af1 100644 --- a/types/batch.go +++ b/types/batch.go @@ -7,8 +7,45 @@ const ( // Batch defines a struct for block aggregation for support of batching. // TODO: maybe change to BlockBatch type Batch struct { - StartHeight uint64 - EndHeight uint64 - Blocks []*Block - Commits []*Commit + Blocks []*Block + Commits []*Commit +} + +// StartHeight is the height of the first block in the batch. +func (b Batch) StartHeight() uint64 { + if len(b.Blocks) == 0 { + return 0 + } + return b.Blocks[0].Header.Height +} + +// EndHeight is the height of the last block in the batch +func (b Batch) EndHeight() uint64 { + if len(b.Blocks) == 0 { + return 0 + } + return b.Blocks[len(b.Blocks)-1].Header.Height +} + +// NumBlocks is the number of blocks in the batch +func (b Batch) NumBlocks() uint64 { + return uint64(len(b.Blocks)) +} + +// SizeBlockAndCommitBytes returns the sum of the size of bytes of the blocks and commits +// The actual size of the batch may be different due to additional metadata and protobuf +// optimizations. +func (b Batch) SizeBlockAndCommitBytes() int { + cnt := 0 + for _, block := range b.Blocks { + cnt += block.SizeBytes() + } + for _, commit := range b.Commits { + cnt += commit.SizeBytes() + } + return cnt +} + +func (b Batch) SizeBytes() int { + return b.ToProto().Size() } diff --git a/types/block.go b/types/block.go index 8fdc1d0e4..542fd27fc 100644 --- a/types/block.go +++ b/types/block.go @@ -66,6 +66,10 @@ type Block struct { LastCommit Commit } +func (b Block) SizeBytes() int { + return b.ToProto().Size() +} + var ( _ encoding.BinaryMarshaler = &Block{} _ encoding.BinaryUnmarshaler = &Block{} @@ -92,6 +96,10 @@ type Commit struct { TMSignature tmtypes.CommitSig } +func (c Commit) SizeBytes() int { + return c.ToProto().Size() +} + // Signature represents signature of block creator. type Signature []byte diff --git a/types/errors.go b/types/errors.go index 7c8154e25..ae7925b71 100644 --- a/types/errors.go +++ b/types/errors.go @@ -5,7 +5,7 @@ import "errors" var ( ErrInvalidSignature = errors.New("invalid signature") ErrNoStateFound = errors.New("no state found") - ErrSkippedEmptyBlock = errors.New("skipped empty block") + ErrEmptyBlock = errors.New("block has no transactions and is not allowed to be empty") ErrInvalidBlockHeight = errors.New("invalid block height") ErrInvalidHeaderDataHash = errors.New("header not matching block data") ) diff --git a/types/metrics.go b/types/metrics.go index a53ff3c65..67dd729a9 100644 --- a/types/metrics.go +++ b/types/metrics.go @@ -24,3 +24,13 @@ var RollappBlockSizeTxsGauge = promauto.NewGauge(prometheus.GaugeOpts{ Name: "rollapp_block_size_txs", Help: "Rollapp ", }) + +var RollappPendingSubmissionsSkewNumBatches = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "rollapp_pending_submissions_skew_num_batches", + Help: "The number of batches which have been accumulated but not yet submitted.", +}) + +var RollappPendingSubmissionsSkewNumBytes = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "rollapp_pending_submissions_skew_num_bytes", + Help: "The number of bytes (of blocks and commits) which have been accumulated but not yet submitted.", +}) diff --git a/types/serialization.go b/types/serialization.go index 5cd637479..385bdde28 100644 --- a/types/serialization.go +++ b/types/serialization.go @@ -2,7 +2,9 @@ package types import ( "errors" + "fmt" + "github.com/dymensionxyz/gerr-cosmos/gerrc" abci "github.com/tendermint/tendermint/abci/types" prototypes "github.com/tendermint/tendermint/proto/tendermint/types" "github.com/tendermint/tendermint/types" @@ -158,8 +160,8 @@ func (b *Block) ToProto() *pb.Block { // ToProto converts Batch into protobuf representation and returns it. func (b *Batch) ToProto() *pb.Batch { return &pb.Batch{ - StartHeight: b.StartHeight, - EndHeight: b.EndHeight, + StartHeight: b.StartHeight(), + EndHeight: b.EndHeight(), Blocks: blocksToProto(b.Blocks), Commits: commitsToProto(b.Commits), } @@ -195,8 +197,15 @@ func (b *Block) FromProto(other *pb.Block) error { // FromProto fills Batch with data from its protobuf representation. func (b *Batch) FromProto(other *pb.Batch) error { - b.StartHeight = other.StartHeight - b.EndHeight = other.EndHeight + n := len(other.Blocks) + start := other.StartHeight + end := other.EndHeight + if 0 < n && start != other.Blocks[0].Header.GetHeight() { + return fmt.Errorf("start height does not match first block height: %w", gerrc.ErrInvalidArgument) + } + if 0 < n && end != other.Blocks[n-1].Header.GetHeight() { + return fmt.Errorf("end height does not match last block height: %w", gerrc.ErrInvalidArgument) + } b.Blocks = protoToBlocks(other.Blocks) b.Commits = protoToCommits(other.Commits) return nil diff --git a/utils/channel/funcs.go b/utils/channel/funcs.go new file mode 100644 index 000000000..614414a3f --- /dev/null +++ b/utils/channel/funcs.go @@ -0,0 +1,31 @@ +package channel + +// DrainForever will drain the channels in separate go routines in a loop forever +// Intended for tests only +func DrainForever[T any](chs ...<-chan T) { + for _, ch := range chs { + go func() { + for { + <-ch + } + }() + } +} + +// Nudger can be used to make a goroutine ('A') sleep, and have another goroutine ('B') wake him up +// A will not block if B is not asleep. +type Nudger struct { + C chan struct{} // Receive on C to sleep +} + +func NewNudger() *Nudger { + return &Nudger{make(chan struct{})} +} + +// Nudge wakes up the waiting thread if any. Non blocking. +func (w Nudger) Nudge() { + select { + case w.C <- struct{}{}: + default: + } +} diff --git a/utils/event/funcs.go b/utils/event/funcs.go index 0bfe9c965..ee925e07a 100644 --- a/utils/event/funcs.go +++ b/utils/event/funcs.go @@ -2,6 +2,7 @@ package event import ( "context" + "errors" "fmt" "github.com/dymensionxyz/dymint/types" @@ -13,6 +14,7 @@ import ( // MustSubscribe subscribes to events and sends back a callback // clientID is essentially the subscriber id, see https://pkg.go.dev/github.com/tendermint/tendermint/libs/pubsub#pkg-overview +// - will not panic on context cancel or deadline exceeded func MustSubscribe( ctx context.Context, pubsubServer *pubsub.Server, @@ -22,7 +24,7 @@ func MustSubscribe( logger types.Logger, ) { subscription, err := pubsubServer.SubscribeUnbuffered(ctx, clientID, eventQuery) - if err != nil { + if err != nil && !errors.Is(err, context.Canceled) { logger.Error("subscribe to events") panic(err) } @@ -40,10 +42,10 @@ func MustSubscribe( } } -// MustPublish submits an event or panics +// MustPublish submits an event or panics - will not panic on context cancel or deadline exceeded func MustPublish(ctx context.Context, pubsubServer *pubsub.Server, msg interface{}, events map[string][]string) { err := pubsubServer.PublishWithEvents(ctx, msg, events) - if err != nil { + if err != nil && !errors.Is(err, context.Canceled) { panic(err) } }