diff --git a/block/manager.go b/block/manager.go index 2d97afe5d..7761c4aa8 100644 --- a/block/manager.go +++ b/block/manager.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" "code.cloudfoundry.org/go-diodes" @@ -70,10 +71,11 @@ type Manager struct { shouldProduceBlocksCh chan bool produceEmptyBlockCh chan bool - syncTarget uint64 - lastSubmissionTime int64 - batchInProcess atomic.Value - isSyncedCond sync.Cond + syncTarget uint64 + lastSubmissionTime int64 + batchInProcess atomic.Value + isSyncedCond sync.Cond + produceBlockMutex sync.Mutex applyCachedBlockMutex sync.Mutex @@ -182,20 +184,62 @@ func NewManager( // Start starts the block manager. func (m *Manager) Start(ctx context.Context, isAggregator bool) error { m.logger.Info("Starting the block manager") + + err := m.syncBlockManager(ctx) + if err != nil { + err = fmt.Errorf("failed to sync block manager: %w", err) + return err + } + if isAggregator { m.logger.Info("Starting in aggregator mode") // TODO(omritoptix): change to private methods go m.ProduceBlockLoop(ctx) go m.SubmitLoop(ctx) + } else { + // TODO(omritoptix): change to private methods + go m.RetriveLoop(ctx) + go m.SyncTargetLoop(ctx) } - // TODO(omritoptix): change to private methods - go m.RetriveLoop(ctx) - go m.SyncTargetLoop(ctx) - m.EventListener(ctx) + + m.EventListener(ctx, isAggregator) return nil } +// syncBlockManager enforces the node to be synced on initial run. +func (m *Manager) syncBlockManager(ctx context.Context) error { + resultRetrieveBatch, err := m.getLatestBatchFromSL(ctx) + // Set the syncTarget according to the result + if err != nil { + //TODO: separate between fresh rollapp and non-registred rollapp + if err == settlement.ErrBatchNotFound { + // Since we requested the latest batch and got batch not found it means + // the SL still hasn't got any batches for this chain. + m.logger.Info("No batches for chain found in SL. Start writing first batch") + atomic.StoreUint64(&m.syncTarget, uint64(m.genesis.InitialHeight-1)) + return nil + } + return err + } + atomic.StoreUint64(&m.syncTarget, resultRetrieveBatch.EndHeight) + err = m.syncUntilTarget(ctx, resultRetrieveBatch.EndHeight) + if err != nil { + return err + } + + m.logger.Info("Synced", "current height", m.store.Height(), "syncTarget", atomic.LoadUint64(&m.syncTarget)) + return nil +} + +// updateSyncParams updates the sync target and state index if necessary +func (m *Manager) updateSyncParams(endHeight uint64) { + rollappHubHeightGauge.Set(float64(endHeight)) + m.logger.Info("Received new syncTarget", "syncTarget", endHeight) + atomic.StoreUint64(&m.syncTarget, endHeight) + atomic.StoreInt64(&m.lastSubmissionTime, time.Now().UnixNano()) +} + func getAddress(key crypto.PrivKey) ([]byte, error) { rawKey, err := key.GetPublic().Raw() if err != nil { @@ -205,9 +249,11 @@ func getAddress(key crypto.PrivKey) ([]byte, error) { } // EventListener registers events to callbacks. -func (m *Manager) EventListener(ctx context.Context) { +func (m *Manager) EventListener(ctx context.Context, isAggregator bool) { go utils.SubscribeAndHandleEvents(ctx, m.pubsub, "nodeHealthStatusHandler", events.EventQueryHealthStatus, m.healthStatusEventCallback, m.logger) - go utils.SubscribeAndHandleEvents(ctx, m.pubsub, "ApplyBlockLoop", p2p.EventQueryNewNewGossipedBlock, m.applyBlockCallback, m.logger, 100) + if !isAggregator { + go utils.SubscribeAndHandleEvents(ctx, m.pubsub, "ApplyBlockLoop", p2p.EventQueryNewNewGossipedBlock, m.applyBlockCallback, m.logger, 100) + } } diff --git a/block/manager_test.go b/block/manager_test.go index 5fed0ae17..d61a46367 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -121,9 +121,9 @@ func TestProduceOnlyAfterSynced(t *testing.T) { require.NotNil(t, manager) t.Log("Taking the manager out of sync by submitting a batch") - lastStoreHeight := manager.store.Height() + syncTarget := atomic.LoadUint64(&manager.syncTarget) numBatchesToAdd := 2 - nextBatchStartHeight := atomic.LoadUint64(&manager.syncTarget) + 1 + nextBatchStartHeight := syncTarget + 1 var batch *types.Batch for i := 0; i < numBatchesToAdd; i++ { batch, err = testutil.GenerateBatch(nextBatchStartHeight, nextBatchStartHeight+uint64(defaultBatchSize-1), manager.proposerKey) @@ -136,30 +136,17 @@ func TestProduceOnlyAfterSynced(t *testing.T) { time.Sleep(time.Millisecond * 500) } - t.Log("Validating manager can't produce blocks") - ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) - defer cancel() - go manager.ProduceBlockLoop(ctx) - <-ctx.Done() - assert.Equal(t, lastStoreHeight, manager.store.Height()) - // Wait until produce block loop is done - time.Sleep(time.Second * 1) - - t.Log("Sync the manager") - ctx, cancel = context.WithTimeout(context.Background(), time.Second*2) - defer cancel() - go manager.Start(ctx, false) - <-ctx.Done() - require.Greater(t, manager.store.Height(), lastStoreHeight) - assert.Equal(t, batch.EndHeight, manager.store.Height()) - // Wait until manager is done - time.Sleep(time.Second * 4) + //Initially sync target is 0 + assert.True(t, manager.syncTarget == 0) + assert.True(t, manager.store.Height() == 0) - t.Log("Validate blocks are produced") - ctx, cancel = context.WithTimeout(context.Background(), time.Second*3) + //enough time to sync and produce blocks + ctx, cancel := context.WithTimeout(context.Background(), time.Second*4) defer cancel() - go manager.ProduceBlockLoop(ctx) + go manager.Start(ctx, true) <-ctx.Done() + assert.True(t, manager.syncTarget == batch.EndHeight) + //validate that we produced blocks assert.Greater(t, manager.store.Height(), batch.EndHeight) } diff --git a/block/produce.go b/block/produce.go index ad6458830..540ab55f5 100644 --- a/block/produce.go +++ b/block/produce.go @@ -3,12 +3,9 @@ package block import ( "context" "fmt" - "sync/atomic" "time" - "cosmossdk.io/errors" abciconv "github.com/dymensionxyz/dymint/conv/abci" - "github.com/dymensionxyz/dymint/settlement" "github.com/dymensionxyz/dymint/types" tmed25519 "github.com/tendermint/tendermint/crypto/ed25519" cmtproto "github.com/tendermint/tendermint/proto/tendermint/types" @@ -16,47 +13,9 @@ import ( tmtime "github.com/tendermint/tendermint/types/time" ) -// waitForSync enforces the aggregator to be synced before it can produce blocks. -// It requires the retriveBlockLoop to be running. -func (m *Manager) waitForSync(ctx context.Context) error { - resultRetrieveBatch, err := m.getLatestBatchFromSL(ctx) - // Set the syncTarget according to the result - if err == settlement.ErrBatchNotFound { - // Since we requested the latest batch and got batch not found it means - // the SL still hasn't got any batches for this chain. - m.logger.Info("No batches for chain found in SL. Start writing first batch") - atomic.StoreUint64(&m.syncTarget, uint64(m.genesis.InitialHeight-1)) - return nil - } else if err != nil { - m.logger.Error("failed to retrieve batch from SL", "err", err) - return err - } else { - m.updateSyncParams(ctx, resultRetrieveBatch.EndHeight) - } - // Wait until isSynced is true and then call the PublishBlockLoop - m.isSyncedCond.L.Lock() - // Wait until we're synced and that we have got the latest batch (if we didn't, m.syncTarget == 0) - // before we start publishing blocks - for m.store.Height() < atomic.LoadUint64(&m.syncTarget) { - m.logger.Info("Waiting for sync", "current height", m.store.Height(), "syncTarget", atomic.LoadUint64(&m.syncTarget)) - m.isSyncedCond.Wait() - } - m.isSyncedCond.L.Unlock() - m.logger.Info("Synced, Starting to produce", "current height", m.store.Height(), "syncTarget", atomic.LoadUint64(&m.syncTarget)) - return nil -} - // ProduceBlockLoop is calling publishBlock in a loop as long as wer'e synced. func (m *Manager) ProduceBlockLoop(ctx context.Context) { - atomic.StoreInt64(&m.lastSubmissionTime, time.Now().Unix()) - - // We want to wait until we are synced. After that, since there is no leader - // election yet, and leader are elected manually, we will not be out of sync until - // we are manually being replaced. - err := m.waitForSync(ctx) - if err != nil { - panic(errors.Wrap(err, "failed to wait for sync")) - } + m.logger.Debug("Started produce loop") ticker := time.NewTicker(m.conf.BlockTime) defer ticker.Stop() diff --git a/block/production_test.go b/block/production_test.go index 8f0cadc6b..78de797e9 100644 --- a/block/production_test.go +++ b/block/production_test.go @@ -3,6 +3,7 @@ package block import ( "context" "fmt" + "sync" "testing" "time" @@ -12,6 +13,7 @@ import ( "github.com/dymensionxyz/dymint/config" "github.com/dymensionxyz/dymint/mempool" mempoolv1 "github.com/dymensionxyz/dymint/mempool/v1" + "github.com/dymensionxyz/dymint/types" tmcfg "github.com/tendermint/tendermint/config" "github.com/dymensionxyz/dymint/testutil" @@ -193,11 +195,63 @@ func TestBatchSubmissionAfterTimeout(t *testing.T) { require.Equal(initialHeight, manager.store.Height()) require.True(manager.batchInProcess.Load() == false) + require.True(manager.syncTarget == 0) + + var wg sync.WaitGroup mCtx, cancel := context.WithTimeout(context.Background(), runTime) defer cancel() - go manager.ProduceBlockLoop(mCtx) - go manager.SubmitLoop(mCtx) + + wg.Add(2) // Add 2 because we have 2 goroutines + + go func() { + defer wg.Done() // Decrease counter when this goroutine finishes + manager.ProduceBlockLoop(mCtx) + }() + + go func() { + defer wg.Done() // Decrease counter when this goroutine finishes + manager.SubmitLoop(mCtx) + }() + <-mCtx.Done() + wg.Wait() // Wait for all goroutines to finish + require.True(manager.syncTarget > 0) +} + +func TestInvalidBatch(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + manager, err := getManager(getManagerConfig(), nil, nil, 1, 1, 0, nil, nil) + require.NoError(err) - require.True(manager.batchInProcess.Load() == true) + batchSize := uint64(5) + syncTarget := uint64(10) + + // Create cases + cases := []struct { + startHeight uint64 + endHeight uint64 + shouldError bool + }{ + {startHeight: syncTarget + 1, endHeight: syncTarget + batchSize, shouldError: false}, + // batch with endHight < startHeight + {startHeight: syncTarget + 1, endHeight: syncTarget, shouldError: true}, + // batch with startHeight != previousEndHeight + 1 + {startHeight: syncTarget, endHeight: syncTarget + batchSize + batchSize, shouldError: true}, + } + for _, c := range cases { + batch := &types.Batch{ + StartHeight: c.startHeight, + EndHeight: c.endHeight, + } + + manager.updateSyncParams(syncTarget) + err := manager.validateBatch(batch) + if c.shouldError { + assert.Error(err) + } else { + assert.NoError(err) + } + } } diff --git a/block/retriever.go b/block/retriever.go index 3de011ca4..a10d16fd7 100644 --- a/block/retriever.go +++ b/block/retriever.go @@ -13,7 +13,8 @@ import ( // runs syncUntilTarget on the latest message in the ring buffer. func (m *Manager) RetriveLoop(ctx context.Context) { m.logger.Info("Started retrieve loop") - syncTargetpoller := diodes.NewPoller(m.syncTargetDiode) + syncTargetpoller := diodes.NewPoller(m.syncTargetDiode, diodes.WithPollingContext(ctx)) + for { select { case <-ctx.Done(): diff --git a/block/submit.go b/block/submit.go index 423ed19fe..cd4b0415d 100644 --- a/block/submit.go +++ b/block/submit.go @@ -2,6 +2,7 @@ package block import ( "context" + "fmt" "sync/atomic" "time" @@ -25,7 +26,7 @@ func (m *Manager) SubmitLoop(ctx context.Context) { syncTarget := atomic.LoadUint64(&m.syncTarget) height := m.store.Height() //no new blocks produced yet - if (height - syncTarget) == 0 { + if height <= syncTarget { continue } @@ -41,20 +42,29 @@ func (m *Manager) SubmitLoop(ctx context.Context) { if err != nil { m.logger.Error("error while producing empty block", "error", err) } - m.submitNextBatch(ctx) + + syncHeight, err := m.submitNextBatch(ctx) + if err != nil { + m.logger.Error("error while submitting next batch", "error", err) + continue + } + + // Update the syncTarget to the height of the last block in the last batch as seen by this node. + m.batchInProcess.Store(false) + m.updateSyncParams(syncHeight) } } } -func (m *Manager) submitNextBatch(ctx context.Context) { +func (m *Manager) submitNextBatch(ctx context.Context) (uint64, error) { // Get the batch start and end height startHeight := atomic.LoadUint64(&m.syncTarget) + 1 - endHeight := uint64(m.lastState.LastBlockHeight) + endHeight := uint64(m.store.Height()) isLastBlockEmpty, err := m.validateLastBlockInBatchIsEmpty(startHeight, endHeight) if err != nil { m.logger.Error("Failed to validate last block in batch is empty", "startHeight", startHeight, "endHeight", endHeight, "error", err) - return + return 0, err } if !isLastBlockEmpty { m.logger.Info("Requesting for an empty block creation") @@ -65,7 +75,11 @@ func (m *Manager) submitNextBatch(ctx context.Context) { nextBatch, err := m.createNextDABatch(startHeight, endHeight) if err != nil { m.logger.Error("Failed to create next batch", "startHeight", startHeight, "endHeight", endHeight, "error", err) - return + return 0, err + } + + if err := m.validateBatch(nextBatch); err != nil { + return 0, err } actualEndHeight := nextBatch.EndHeight @@ -74,13 +88,32 @@ func (m *Manager) submitNextBatch(ctx context.Context) { m.logger.Info("Submitting next batch", "startHeight", startHeight, "endHeight", actualEndHeight, "size", nextBatch.ToProto().Size()) resultSubmitToDA := m.dalc.SubmitBatch(nextBatch) if resultSubmitToDA.Code != da.StatusSuccess { - panic("Failed to submit next batch to DA Layer") + err = fmt.Errorf("failed to submit next batch to DA Layer: %s", resultSubmitToDA.Message) + return 0, err } // Submit batch to SL // TODO(omritoptix): Handle a case where the SL submission fails due to syncTarget out of sync with the latestHeight in the SL. // In that case we'll want to update the syncTarget before returning. - m.settlementClient.SubmitBatch(nextBatch, m.dalc.GetClientType(), &resultSubmitToDA) + + err = m.settlementClient.SubmitBatch(nextBatch, m.dalc.GetClientType(), &resultSubmitToDA) + if err != nil { + m.logger.Error("Failed to submit batch to SL", "startHeight", startHeight, "endHeight", actualEndHeight, "error", err) + return 0, err + } + + return actualEndHeight, nil +} + +func (m *Manager) validateBatch(batch *types.Batch) error { + syncTarget := atomic.LoadUint64(&m.syncTarget) + if batch.StartHeight != syncTarget+1 { + return fmt.Errorf("batch start height != syncTarget + 1. StartHeight %d, m.syncTarget %d", batch.StartHeight, syncTarget) + } + if batch.EndHeight < batch.StartHeight { + return fmt.Errorf("batch end height must be greater than start height. EndHeight %d, StartHeight %d", batch.EndHeight, batch.StartHeight) + } + return nil } func (m *Manager) createNextDABatch(startHeight uint64, endHeight uint64) (*types.Batch, error) { diff --git a/block/synctarget.go b/block/synctarget.go index a7f81b457..476554327 100644 --- a/block/synctarget.go +++ b/block/synctarget.go @@ -2,8 +2,6 @@ package block import ( "context" - "sync/atomic" - "time" "code.cloudfoundry.org/go-diodes" "github.com/dymensionxyz/dymint/settlement" @@ -11,47 +9,29 @@ import ( // SyncTargetLoop is responsible for getting real time updates about batches submission. // for non aggregator: updating the sync target which will be used by retrieveLoop to sync until this target. -// for aggregator: get notification that batch has been accepted so can send next batch. func (m *Manager) SyncTargetLoop(ctx context.Context) { m.logger.Info("Started sync target loop") - subscription, err := m.pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewBatchAccepted) + subscription, err := m.pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted) if err != nil { m.logger.Error("failed to subscribe to state update events") panic(err) } - // First time we start we want to get the latest batch from the SL - resultRetrieveBatch, err := m.getLatestBatchFromSL(ctx) - if err != nil { - m.logger.Error("failed to retrieve batch from SL", "err", err) - } else { - m.updateSyncParams(ctx, resultRetrieveBatch.EndHeight) - } + for { select { case <-ctx.Done(): return case event := <-subscription.Out(): - eventData := event.Data().(*settlement.EventDataNewBatchAccepted) - m.updateSyncParams(ctx, eventData.EndHeight) - // In case we are the aggregator and we've got an update, then we can stop blocking from - // the next batches to be published. For non-aggregators this is not needed. - // We only want to send the next once the previous has been published successfully. - // TODO(omritoptix): Once we have leader election, we can add a condition. - // Update batch accepted is only relevant for the aggregator - // TODO(omritoptix): Check if we are the aggregator - m.batchInProcess.Store(false) + eventData := event.Data().(*settlement.EventDataNewSettlementBatchAccepted) + if eventData.EndHeight <= m.store.Height() { + m.logger.Error("syncTargetLoop: event is old, skipping") + continue + } + m.updateSyncParams(eventData.EndHeight) + m.syncTargetDiode.Set(diodes.GenericDataType(&eventData.EndHeight)) case <-subscription.Cancelled(): m.logger.Info("syncTargetLoop subscription canceled") return } } } - -// updateSyncParams updates the sync target and state index if necessary -func (m *Manager) updateSyncParams(ctx context.Context, endHeight uint64) { - rollappHubHeightGauge.Set(float64(endHeight)) - m.logger.Info("Received new syncTarget", "syncTarget", endHeight) - atomic.StoreUint64(&m.syncTarget, endHeight) - atomic.StoreInt64(&m.lastSubmissionTime, time.Now().UnixNano()) - m.syncTargetDiode.Set(diodes.GenericDataType(&endHeight)) -} diff --git a/mocks/settlement/hubclient.go b/mocks/settlement/hubclient.go index 47df4158b..31cf0cb0d 100644 --- a/mocks/settlement/hubclient.go +++ b/mocks/settlement/hubclient.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.15.0. DO NOT EDIT. +// Code generated by mockery v2.39.1. DO NOT EDIT. package mocks @@ -20,7 +20,15 @@ type HubClient struct { func (_m *HubClient) GetBatchAtIndex(rollappID string, index uint64) (*settlement.ResultRetrieveBatch, error) { ret := _m.Called(rollappID, index) + if len(ret) == 0 { + panic("no return value specified for GetBatchAtIndex") + } + var r0 *settlement.ResultRetrieveBatch + var r1 error + if rf, ok := ret.Get(0).(func(string, uint64) (*settlement.ResultRetrieveBatch, error)); ok { + return rf(rollappID, index) + } if rf, ok := ret.Get(0).(func(string, uint64) *settlement.ResultRetrieveBatch); ok { r0 = rf(rollappID, index) } else { @@ -29,7 +37,6 @@ func (_m *HubClient) GetBatchAtIndex(rollappID string, index uint64) (*settlemen } } - var r1 error if rf, ok := ret.Get(1).(func(string, uint64) error); ok { r1 = rf(rollappID, index) } else { @@ -43,7 +50,15 @@ func (_m *HubClient) GetBatchAtIndex(rollappID string, index uint64) (*settlemen func (_m *HubClient) GetLatestBatch(rollappID string) (*settlement.ResultRetrieveBatch, error) { ret := _m.Called(rollappID) + if len(ret) == 0 { + panic("no return value specified for GetLatestBatch") + } + var r0 *settlement.ResultRetrieveBatch + var r1 error + if rf, ok := ret.Get(0).(func(string) (*settlement.ResultRetrieveBatch, error)); ok { + return rf(rollappID) + } if rf, ok := ret.Get(0).(func(string) *settlement.ResultRetrieveBatch); ok { r0 = rf(rollappID) } else { @@ -52,7 +67,6 @@ func (_m *HubClient) GetLatestBatch(rollappID string) (*settlement.ResultRetriev } } - var r1 error if rf, ok := ret.Get(1).(func(string) error); ok { r1 = rf(rollappID) } else { @@ -66,7 +80,15 @@ func (_m *HubClient) GetLatestBatch(rollappID string) (*settlement.ResultRetriev func (_m *HubClient) GetSequencers(rollappID string) ([]*types.Sequencer, error) { ret := _m.Called(rollappID) + if len(ret) == 0 { + panic("no return value specified for GetSequencers") + } + var r0 []*types.Sequencer + var r1 error + if rf, ok := ret.Get(0).(func(string) ([]*types.Sequencer, error)); ok { + return rf(rollappID) + } if rf, ok := ret.Get(0).(func(string) []*types.Sequencer); ok { r0 = rf(rollappID) } else { @@ -75,7 +97,6 @@ func (_m *HubClient) GetSequencers(rollappID string) ([]*types.Sequencer, error) } } - var r1 error if rf, ok := ret.Get(1).(func(string) error); ok { r1 = rf(rollappID) } else { @@ -86,14 +107,31 @@ func (_m *HubClient) GetSequencers(rollappID string) ([]*types.Sequencer, error) } // PostBatch provides a mock function with given fields: batch, daClient, daResult -func (_m *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) { - _m.Called(batch, daClient, daResult) +func (_m *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) error { + ret := _m.Called(batch, daClient, daResult) + + if len(ret) == 0 { + panic("no return value specified for PostBatch") + } + + var r0 error + if rf, ok := ret.Get(0).(func(*types.Batch, da.Client, *da.ResultSubmitBatch) error); ok { + r0 = rf(batch, daClient, daResult) + } else { + r0 = ret.Error(0) + } + + return r0 } // Start provides a mock function with given fields: func (_m *HubClient) Start() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Start") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -108,6 +146,10 @@ func (_m *HubClient) Start() error { func (_m *HubClient) Stop() error { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Stop") + } + var r0 error if rf, ok := ret.Get(0).(func() error); ok { r0 = rf() @@ -118,13 +160,12 @@ func (_m *HubClient) Stop() error { return r0 } -type mockConstructorTestingTNewHubClient interface { +// NewHubClient creates a new instance of HubClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewHubClient(t interface { mock.TestingT Cleanup(func()) -} - -// NewHubClient creates a new instance of HubClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewHubClient(t mockConstructorTestingTNewHubClient) *HubClient { +}) *HubClient { mock := &HubClient{} mock.Mock.Test(t) diff --git a/rpc/client/client.go b/rpc/client/client.go index ec5688c88..cc75127de 100644 --- a/rpc/client/client.go +++ b/rpc/client/client.go @@ -124,60 +124,67 @@ func (c *Client) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*ctypes.Re // add to mempool and wait for CheckTx result checkTxResCh := make(chan *abci.Response, 1) err = c.node.Mempool.CheckTx(tx, func(res *abci.Response) { - checkTxResCh <- res + select { + case <-ctx.Done(): + case checkTxResCh <- res: + } }, mempool.TxInfo{}) if err != nil { c.Logger.Error("Error on broadcastTxCommit", "err", err) return nil, fmt.Errorf("error on broadcastTxCommit: %v", err) } - checkTxResMsg := <-checkTxResCh - checkTxRes := checkTxResMsg.GetCheckTx() - if checkTxRes.Code != abci.CodeTypeOK { - return &ctypes.ResultBroadcastTxCommit{ - CheckTx: *checkTxRes, - DeliverTx: abci.ResponseDeliverTx{}, - Hash: tx.Hash(), - }, nil - } + select { + case <-ctx.Done(): + return nil, fmt.Errorf("broadcast confirmation not received: %w", ctx.Err()) + case checkTxResMsg := <-checkTxResCh: + checkTxRes := checkTxResMsg.GetCheckTx() + if checkTxRes.Code != abci.CodeTypeOK { + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + DeliverTx: abci.ResponseDeliverTx{}, + Hash: tx.Hash(), + }, nil + } - // broadcast tx - err = c.node.P2P.GossipTx(ctx, tx) - if err != nil { - return nil, fmt.Errorf("tx added to local mempool but failure to broadcast: %w", err) - } + // broadcast tx + err = c.node.P2P.GossipTx(ctx, tx) + if err != nil { + return nil, fmt.Errorf("tx added to local mempool but failure to broadcast: %w", err) + } - // Wait for the tx to be included in a block or timeout. - select { - case msg := <-deliverTxSub.Out(): // The tx was included in a block. - deliverTxRes := msg.Data().(types.EventDataTx) - return &ctypes.ResultBroadcastTxCommit{ - CheckTx: *checkTxRes, - DeliverTx: deliverTxRes.Result, - Hash: tx.Hash(), - Height: deliverTxRes.Height, - }, nil - case <-deliverTxSub.Cancelled(): - var reason string - if deliverTxSub.Err() == nil { - reason = "Tendermint exited" - } else { - reason = deliverTxSub.Err().Error() + // Wait for the tx to be included in a block or timeout. + select { + case msg := <-deliverTxSub.Out(): // The tx was included in a block. + deliverTxRes := msg.Data().(types.EventDataTx) + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + DeliverTx: deliverTxRes.Result, + Hash: tx.Hash(), + Height: deliverTxRes.Height, + }, nil + case <-deliverTxSub.Cancelled(): + var reason string + if deliverTxSub.Err() == nil { + reason = "Tendermint exited" + } else { + reason = deliverTxSub.Err().Error() + } + err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason) + c.Logger.Error("Error on broadcastTxCommit", "err", err) + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + DeliverTx: abci.ResponseDeliverTx{}, + Hash: tx.Hash(), + }, err + case <-time.After(c.config.TimeoutBroadcastTxCommit): + err = errors.New("timed out waiting for tx to be included in a block") + c.Logger.Error("Error on broadcastTxCommit", "err", err) + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: *checkTxRes, + DeliverTx: abci.ResponseDeliverTx{}, + Hash: tx.Hash(), + }, err } - err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason) - c.Logger.Error("Error on broadcastTxCommit", "err", err) - return &ctypes.ResultBroadcastTxCommit{ - CheckTx: *checkTxRes, - DeliverTx: abci.ResponseDeliverTx{}, - Hash: tx.Hash(), - }, err - case <-time.After(c.config.TimeoutBroadcastTxCommit): - err = errors.New("timed out waiting for tx to be included in a block") - c.Logger.Error("Error on broadcastTxCommit", "err", err) - return &ctypes.ResultBroadcastTxCommit{ - CheckTx: *checkTxRes, - DeliverTx: abci.ResponseDeliverTx{}, - Hash: tx.Hash(), - }, err } } diff --git a/settlement/base.go b/settlement/base.go index 07be29c5a..93f3a697b 100644 --- a/settlement/base.go +++ b/settlement/base.go @@ -3,12 +3,10 @@ package settlement import ( "context" "fmt" - "sync/atomic" "github.com/dymensionxyz/dymint/da" "github.com/dymensionxyz/dymint/log" "github.com/dymensionxyz/dymint/types" - "github.com/dymensionxyz/dymint/utils" "github.com/tendermint/tendermint/libs/pubsub" ) @@ -16,7 +14,6 @@ import ( type BaseLayerClient struct { logger log.Logger pubsub *pubsub.Server - latestHeight uint64 sequencersList []*types.Sequencer config Config ctx context.Context @@ -35,6 +32,7 @@ func WithHubClient(hubClient HubClient) Option { // Init is called once. it initializes the struct members. func (b *BaseLayerClient) Init(config Config, pubsub *pubsub.Server, logger log.Logger, options ...Option) error { + var err error b.config = config b.pubsub = pubsub b.logger = logger @@ -44,19 +42,6 @@ func (b *BaseLayerClient) Init(config Config, pubsub *pubsub.Server, logger log. apply(b) } - latestBatch, err := b.RetrieveBatch() - var endHeight uint64 - if err != nil { - if err == ErrBatchNotFound { - endHeight = 0 - } else { - return err - } - } else { - endHeight = latestBatch.EndHeight - } - b.latestHeight = endHeight - b.logger.Info("Updated latest height from settlement layer", "latestHeight", endHeight) b.sequencersList, err = b.fetchSequencersList() if err != nil { return err @@ -70,11 +55,6 @@ func (b *BaseLayerClient) Init(config Config, pubsub *pubsub.Server, logger log. func (b *BaseLayerClient) Start() error { b.logger.Debug("settlement Layer Client starting.") - // Wait until the state updates handler is ready - ready := make(chan bool, 1) - go b.stateUpdatesHandler(ready) - <-ready - err := b.client.Start() if err != nil { return err @@ -95,13 +75,9 @@ func (b *BaseLayerClient) Stop() error { } // SubmitBatch tries submitting the batch in an async broadcast mode to the settlement layer. Events are emitted on success or failure. -func (b *BaseLayerClient) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) { +func (b *BaseLayerClient) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) error { b.logger.Debug("Submitting batch to settlement layer", "start height", batch.StartHeight, "end height", batch.EndHeight) - err := b.validateBatch(batch) - if err != nil { - panic(err) - } - b.client.PostBatch(batch, daClient, daResult) + return b.client.PostBatch(batch, daClient, daResult) } // RetrieveBatch Gets the batch which contains the given slHeight. Empty slHeight returns the latest batch. @@ -148,44 +124,3 @@ func (b *BaseLayerClient) fetchSequencersList() ([]*types.Sequencer, error) { } return sequencers, nil } - -func (b *BaseLayerClient) validateBatch(batch *types.Batch) error { - if batch.StartHeight != atomic.LoadUint64(&b.latestHeight)+1 { - return fmt.Errorf("batch start height != latest height + 1. StartHeight %d, lastetHeight %d", batch.StartHeight, atomic.LoadUint64(&b.latestHeight)) - } - if batch.EndHeight < batch.StartHeight { - return fmt.Errorf("batch end height must be greater than start height. EndHeight %d, StartHeight %d", batch.EndHeight, batch.StartHeight) - } - return nil -} - -func (b *BaseLayerClient) stateUpdatesHandler(ready chan bool) { - b.logger.Info("started state updates handler loop") - subscription, err := b.pubsub.Subscribe(b.ctx, "stateUpdatesHandler", EventQueryNewSettlementBatchAccepted) - if err != nil { - b.logger.Error("failed to subscribe to state update events", "error", err) - panic(err) - } - ready <- true - for { - select { - case event := <-subscription.Out(): - eventData := event.Data().(*EventDataNewSettlementBatchAccepted) - b.logger.Debug("received state update event", "latestHeight", eventData.EndHeight) - atomic.StoreUint64(&b.latestHeight, eventData.EndHeight) - // Emit new batch event - newBatchEventData := &EventDataNewBatchAccepted{ - EndHeight: eventData.EndHeight, - StateIndex: eventData.StateIndex, - } - utils.SubmitEventOrPanic(b.ctx, b.pubsub, newBatchEventData, - map[string][]string{EventTypeKey: {EventNewBatchAccepted}}) - case <-subscription.Cancelled(): - b.logger.Info("stateUpdatesHandler subscription canceled") - return - case <-b.ctx.Done(): - b.logger.Info("Context done. Exiting state update handler") - return - } - } -} diff --git a/settlement/dymension/dymension.go b/settlement/dymension/dymension.go index 4b618510a..4a01071f9 100644 --- a/settlement/dymension/dymension.go +++ b/settlement/dymension/dymension.go @@ -201,17 +201,18 @@ func (d *HubClient) Stop() error { // PostBatch posts a batch to the Dymension Hub. it tries to post the batch until it is accepted by the settlement layer. // it emits success and failure events to the event bus accordingly. -func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) { +func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) error { msgUpdateState, err := d.convertBatchToMsgUpdateState(batch, daClient, daResult) if err != nil { - panic(err) + return err } + //TODO: probably should be changed to be a channel, as the eventHandler is also in the HubClient in he produces the event postBatchSubscriberClient := fmt.Sprintf("%s-%d-%s", postBatchSubscriberPrefix, batch.StartHeight, uuid.New().String()) subscription, err := d.pubsub.Subscribe(d.ctx, postBatchSubscriberClient, settlement.EventQueryNewSettlementBatchAccepted) if err != nil { d.logger.Error("failed to subscribe to state update events", "err", err) - panic(err) + return err } //nolint:errcheck @@ -223,7 +224,7 @@ func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult * for { select { case <-d.ctx.Done(): - return + return d.ctx.Err() default: // Try submitting the batch err := d.submitBatch(msgUpdateState) @@ -246,17 +247,17 @@ func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult * select { case <-d.ctx.Done(): - return + return d.ctx.Err() case <-subscription.Cancelled(): - d.logger.Debug("SLBatchPost subscription canceled") - return + err = fmt.Errorf("subscription canceled") + return err case <-subscription.Out(): d.logger.Info("Batch accepted by settlement layer. Emitting healthy event", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight) heatlhEventData := &settlement.EventDataSettlementHealthStatus{Healthy: true} utils.SubmitEventOrPanic(d.ctx, d.pubsub, heatlhEventData, map[string][]string{settlement.EventTypeKey: {settlement.EventSettlementHealthStatus}}) - return + return nil case <-ticker.C: // Before emitting unhealthy event, check if the batch was accepted by the settlement layer and // we've just missed the event. @@ -274,18 +275,11 @@ func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult * } d.logger.Info("Batch accepted by settlement layer", "startHeight", includedBatch.StartHeight, "endHeight", includedBatch.EndHeight) - // Emit batch accepted event - batchAcceptedEvent := &settlement.EventDataNewSettlementBatchAccepted{ - EndHeight: includedBatch.EndHeight, - StateIndex: includedBatch.StateIndex, - } - utils.SubmitEventOrPanic(d.ctx, d.pubsub, batchAcceptedEvent, - map[string][]string{settlement.EventTypeKey: {settlement.EventNewSettlementBatchAccepted}}) // Emit health event heatlhEventData := &settlement.EventDataSettlementHealthStatus{Healthy: true} utils.SubmitEventOrPanic(d.ctx, d.pubsub, heatlhEventData, map[string][]string{settlement.EventTypeKey: {settlement.EventSettlementHealthStatus}}) - return + return nil } } } diff --git a/settlement/dymension/dymension_test.go b/settlement/dymension/dymension_test.go index 245d9ea72..861b5ff9a 100644 --- a/settlement/dymension/dymension_test.go +++ b/settlement/dymension/dymension_test.go @@ -106,57 +106,49 @@ func TestPostBatch(t *testing.T) { HealthSubscription, err := pubsubServer.Subscribe(context.Background(), "testPostBatch", settlement.EventQuerySettlementHealthStatus) assert.NoError(t, err) - // Subscribe to the batch accepted event - BatchAcceptedSubscription, err := pubsubServer.Subscribe(context.Background(), "testPostBatch", settlement.EventQueryNewSettlementBatchAccepted) - assert.NoError(t, err) - cases := []struct { - name string - isBatchSubmitSuccess bool - isBatchAcceptedHubEvent bool - shouldMockBatchIncluded bool - isBatchIncludedSuccess bool - expectedBatchAcceptedEvent bool - expectedHealthEventValue bool - expectedError error + name string + isBatchSubmitSuccess bool + isBatchAcceptedHubEvent bool + shouldMockBatchIncluded bool + isBatchIncludedSuccess bool + expectedHealthEventValue bool + expectedError error }{ { - name: "TestSubmitBatchFailure", - isBatchSubmitSuccess: false, - isBatchAcceptedHubEvent: false, - shouldMockBatchIncluded: true, - isBatchIncludedSuccess: false, - expectedHealthEventValue: false, - expectedBatchAcceptedEvent: false, - expectedError: submitBatchError, + name: "TestSubmitBatchFailure", + isBatchSubmitSuccess: false, + isBatchAcceptedHubEvent: false, + shouldMockBatchIncluded: true, + isBatchIncludedSuccess: false, + expectedHealthEventValue: false, + expectedError: submitBatchError, }, { - name: "TestSubmitBatchSuccessNoBatchAcceptedHubEventNotIncluded", - isBatchSubmitSuccess: true, - isBatchAcceptedHubEvent: false, - shouldMockBatchIncluded: true, - isBatchIncludedSuccess: false, - expectedHealthEventValue: false, - expectedBatchAcceptedEvent: false, - expectedError: settlement.ErrBatchNotAccepted, + name: "TestSubmitBatchSuccessNoBatchAcceptedHubEventNotIncluded", + isBatchSubmitSuccess: true, + isBatchAcceptedHubEvent: false, + shouldMockBatchIncluded: true, + isBatchIncludedSuccess: false, + expectedHealthEventValue: false, + expectedError: settlement.ErrBatchNotAccepted, }, { - name: "TestSubmitBatchSuccessNotAcceptedYesIncluded", - isBatchSubmitSuccess: true, - isBatchAcceptedHubEvent: false, - shouldMockBatchIncluded: true, - isBatchIncludedSuccess: true, - expectedHealthEventValue: true, - expectedBatchAcceptedEvent: true, + name: "TestSubmitBatchSuccessNotAcceptedYesIncluded", + isBatchSubmitSuccess: true, + isBatchAcceptedHubEvent: false, + shouldMockBatchIncluded: true, + isBatchIncludedSuccess: true, + expectedHealthEventValue: true, + expectedError: nil, }, { - name: "TestSubmitBatchSuccessAndAccepted", - isBatchSubmitSuccess: true, - isBatchAcceptedHubEvent: true, - shouldMockBatchIncluded: false, - expectedHealthEventValue: true, - expectedError: nil, - expectedBatchAcceptedEvent: true, + name: "TestSubmitBatchSuccessAndAccepted", + isBatchSubmitSuccess: true, + isBatchAcceptedHubEvent: true, + shouldMockBatchIncluded: false, + expectedHealthEventValue: true, + expectedError: nil, }, } @@ -164,12 +156,7 @@ func TestPostBatch(t *testing.T) { t.Run(c.name, func(t *testing.T) { // Init the wait group and set the number of expected events var wg sync.WaitGroup - var eventsCount int - if c.expectedBatchAcceptedEvent { - eventsCount = 2 - } else { - eventsCount = 1 - } + eventsCount := 1 wg.Add(eventsCount) // Reset the mock functions testutil.UnsetMockFn(cosmosClientMock.On("BroadcastTx")) @@ -215,21 +202,7 @@ func TestPostBatch(t *testing.T) { } wg.Done() }() - if c.expectedBatchAcceptedEvent { - go func() { - select { - case batchAcceptedEvent := <-BatchAcceptedSubscription.Out(): - t.Logf("got batch accepted event: %v", batchAcceptedEvent) - batchAcceptedEventData := batchAcceptedEvent.Data().(*settlement.EventDataNewSettlementBatchAccepted) - assert.Equal(t, batchAcceptedEventData.EndHeight, batch.EndHeight) - atomic.AddInt64(&eventsReceivedCount, 1) - case <-time.After(10 * time.Second): - t.Error("Didn't receive batch accepted event") - } - wg.Done() - }() - } // Post the batch go hubClient.PostBatch(batch, da.Mock, &da.ResultSubmitBatch{}) // Wait for the batch to be submitted and submit an event notifying that the batch was accepted diff --git a/settlement/events.go b/settlement/events.go index 1993a671b..f101f8053 100644 --- a/settlement/events.go +++ b/settlement/events.go @@ -18,10 +18,8 @@ const ( const ( // This event should be emitted internally in order to communicate between the settlement layer and the hub client EventNewSettlementBatchAccepted = "EventNewSettlementBatchAccepted" - // This event should be emitted externally when a batch is accepted - EventNewBatchAccepted = "EventNewBatchAccepted" - EventSequencersListUpdated = "SequencersListUpdated" - EventSettlementHealthStatus = "SettlementHealthStatus" + EventSequencersListUpdated = "SequencersListUpdated" + EventSettlementHealthStatus = "SettlementHealthStatus" ) // EventDataNewBatchAccepted defines the structure of the event data for the EventNewBatchAccepted @@ -52,7 +50,6 @@ type EventDataSettlementHealthStatus struct { // Define queries var ( EventQueryNewSettlementBatchAccepted = QueryForEvent(EventNewSettlementBatchAccepted) - EventQueryNewBatchAccepted = QueryForEvent(EventNewBatchAccepted) EventQuerySettlementHealthStatus = QueryForEvent(EventSettlementHealthStatus) ) diff --git a/settlement/grpc/grpc.go b/settlement/grpc/grpc.go index 8dcd036a7..2b6036ab2 100644 --- a/settlement/grpc/grpc.go +++ b/settlement/grpc/grpc.go @@ -201,17 +201,17 @@ func (c *HubGrpcClient) Stop() error { } // PostBatch saves the batch to the kv store -func (c *HubGrpcClient) PostBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) { +func (c *HubGrpcClient) PostBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) error { settlementBatch := c.convertBatchtoSettlementBatch(batch, daClient, daResult) c.saveBatch(settlementBatch) - go func() { - // sleep for 10 miliseconds to mimic a delay in batch acceptance - time.Sleep(10 * time.Millisecond) - err := c.pubsub.PublishWithEvents(context.Background(), &settlement.EventDataNewSettlementBatchAccepted{EndHeight: settlementBatch.EndHeight}, map[string][]string{settlement.EventTypeKey: {settlement.EventNewSettlementBatchAccepted}}) - if err != nil { - panic(err) - } - }() + + // sleep for 10 miliseconds to mimic a delay in batch acceptance + time.Sleep(10 * time.Millisecond) + err := c.pubsub.PublishWithEvents(context.Background(), &settlement.EventDataNewSettlementBatchAccepted{EndHeight: settlementBatch.EndHeight}, map[string][]string{settlement.EventTypeKey: {settlement.EventNewSettlementBatchAccepted}}) + if err != nil { + return err + } + return nil } // GetLatestBatch returns the latest batch from the kv store diff --git a/settlement/mock/mock.go b/settlement/mock/mock.go index 5f32f6d31..707715be0 100644 --- a/settlement/mock/mock.go +++ b/settlement/mock/mock.go @@ -150,7 +150,7 @@ func (c *HubClient) Stop() error { } // PostBatch saves the batch to the kv store -func (c *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) { +func (c *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) error { settlementBatch := c.convertBatchtoSettlementBatch(batch, daClient, daResult) c.saveBatch(settlementBatch) go func() { @@ -161,6 +161,7 @@ func (c *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult * panic(err) } }() + return nil } // GetLatestBatch returns the latest batch from the kv store diff --git a/settlement/settlement.go b/settlement/settlement.go index 9d55cebdc..6bc01bea7 100644 --- a/settlement/settlement.go +++ b/settlement/settlement.go @@ -97,7 +97,7 @@ type LayerI interface { // SubmitBatch tries submiting the batch in an async way to the settlement layer. This should create a transaction which (potentially) // triggers a state transition in the settlement layer. Events are emitted on success or failure. - SubmitBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) + SubmitBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) error // RetrieveBatch Gets the batch which contains the given height. Empty height returns the latest batch. RetrieveBatch(stateIndex ...uint64) (*ResultRetrieveBatch, error) @@ -115,7 +115,7 @@ type LayerI interface { type HubClient interface { Start() error Stop() error - PostBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) + PostBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) error GetLatestBatch(rollappID string) (*ResultRetrieveBatch, error) GetBatchAtIndex(rollappID string, index uint64) (*ResultRetrieveBatch, error) GetSequencers(rollappID string) ([]*types.Sequencer, error) diff --git a/settlement/settlement_test.go b/settlement/settlement_test.go index 573d9a093..7a65edab4 100644 --- a/settlement/settlement_test.go +++ b/settlement/settlement_test.go @@ -10,7 +10,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/libs/pubsub" - tmtypes "github.com/tendermint/tendermint/types" "github.com/cosmos/cosmos-sdk/crypto/keys/ed25519" "github.com/dymensionxyz/dymint/da" @@ -20,8 +19,6 @@ import ( "github.com/dymensionxyz/dymint/testutil" "github.com/dymensionxyz/dymint/types" tsmock "github.com/stretchr/testify/mock" - ce "github.com/tendermint/tendermint/crypto/encoding" - pc "github.com/tendermint/tendermint/proto/tendermint/crypto" ) const batchSize = 5 @@ -42,44 +39,6 @@ func TestLifecycle(t *testing.T) { require.NoError(err) } -func TestInvalidSubmit(t *testing.T) { - assert := assert.New(t) - settlementClient := registry.GetClient(registry.Mock) - initClient(t, settlementClient) - - // Create cases - cases := []struct { - startHeight uint64 - endHeight uint64 - shouldPanic bool - }{ - {startHeight: 1, endHeight: batchSize, shouldPanic: false}, - // batch with endHight < startHeight - {startHeight: batchSize + 2, endHeight: 1, shouldPanic: true}, - // batch with startHeight != previousEndHeight + 1 - {startHeight: batchSize, endHeight: 1 + batchSize + batchSize, shouldPanic: true}, - } - for _, c := range cases { - batch := &types.Batch{ - StartHeight: c.startHeight, - EndHeight: c.endHeight, - } - daResult := &da.ResultSubmitBatch{ - BaseResult: da.BaseResult{ - DAHeight: c.endHeight, - }, - } - if c.shouldPanic { - assert.Panics(func() { - settlementClient.SubmitBatch(batch, da.Mock, daResult) - }) - } else { - settlementClient.SubmitBatch(batch, da.Mock, daResult) - } - } - -} - func TestSubmitAndRetrieve(t *testing.T) { require := require.New(t) assert := assert.New(t) @@ -137,7 +96,6 @@ func TestSubmitAndRetrieve(t *testing.T) { func TestGetSequencersEmptyList(t *testing.T) { settlementClient := registry.GetClient(registry.Mock) hubClientMock := mocks.NewHubClient(t) - hubClientMock.On("GetLatestBatch", tsmock.Anything).Return(nil, settlement.ErrBatchNotFound) hubClientMock.On("GetSequencers", tsmock.Anything, tsmock.Anything).Return(nil, settlement.ErrNoSequencerForRollapp) options := []settlement.Option{ settlement.WithHubClient(hubClientMock), @@ -154,7 +112,6 @@ func TestGetSequencers(t *testing.T) { hubClientMock := mocks.NewHubClient(t) hubClientMock.On("Start", tsmock.Anything).Return(nil) hubClientMock.On("Stop", tsmock.Anything).Return(nil) - hubClientMock.On("GetLatestBatch", tsmock.Anything).Return(nil, settlement.ErrBatchNotFound) // Mock a sequencer response by the sequencerByRollapp query totalSequencers := 5 sequencers, proposer := generateSequencers(totalSequencers) @@ -183,7 +140,6 @@ func TestGetSequencers(t *testing.T) { } } assert.Equal(t, inactiveSequencerAmount, totalSequencers-1) - } /* -------------------------------------------------------------------------- */ @@ -202,13 +158,6 @@ func initClient(t *testing.T, settlementlc settlement.LayerI, options ...settlem require.NoError(err) } -func generateProtoPubKey(t *testing.T) pc.PublicKey { - pubKey := tmtypes.NewMockPV().PrivKey.PubKey() - protoPubKey, err := ce.PubKeyToProto(pubKey) - require.NoError(t, err) - return protoPubKey -} - func generateSequencers(count int) ([]*types.Sequencer, *types.Sequencer) { sequencers := make([]*types.Sequencer, count) proposer := &types.Sequencer{