diff --git a/block/manager.go b/block/manager.go index eb260fe6e..cc17905df 100644 --- a/block/manager.go +++ b/block/manager.go @@ -177,6 +177,9 @@ func (m *Manager) Start(ctx context.Context) error { } /* ----------------------------- sequencer mode ----------------------------- */ + // Subscribe to batch events, to update last submitted height in case batch confirmation was lost. This could happen if the sequencer crash/restarted just after submitting a batch to the settelement and by the time we query the last batch, this batch wasn't accepted yet. + go uevent.MustSubscribe(ctx, m.Pubsub, "updateSubmittedHeightLoop", settlement.EventQueryNewSettlementBatchAccepted, m.UpdateLastSubmittedHeight, m.logger) + // Sequencer must wait till DA is synced to start submitting blobs <-m.DAClient.Synced() err = m.syncFromSettlement() diff --git a/block/submit.go b/block/submit.go index cf0fc5280..e582b12d5 100644 --- a/block/submit.go +++ b/block/submit.go @@ -8,9 +8,11 @@ import ( "time" "github.com/dymensionxyz/gerr-cosmos/gerrc" + "github.com/tendermint/tendermint/libs/pubsub" "golang.org/x/sync/errgroup" "github.com/dymensionxyz/dymint/da" + "github.com/dymensionxyz/dymint/settlement" "github.com/dymensionxyz/dymint/types" uatomic "github.com/dymensionxyz/dymint/utils/atomic" uchannel "github.com/dymensionxyz/dymint/utils/channel" @@ -115,6 +117,12 @@ func SubmitLoopInner( logger.Error("Create and submit batch", "err", err, "pending", pending) panic(err) } + // this could happen if we timed-out waiting for acceptance in the previous iteration, but the batch was indeed submitted. + // we panic here cause restarting may reset the last batch submitted counter and the sequencer can potentially resume submitting batches. + if errors.Is(err, gerrc.ErrAlreadyExists) { + logger.Debug("Batch already accepted", "err", err, "pending", pending) + panic(err) + } return err } timeLastSubmission = time.Now() @@ -224,6 +232,7 @@ func (m *Manager) SubmitBatch(batch *types.Batch) error { if err != nil { return fmt.Errorf("sl client submit batch: start height: %d: end height: %d: %w", batch.StartHeight(), batch.EndHeight(), err) } + m.logger.Info("Submitted batch to SL.", "start height", batch.StartHeight(), "end height", batch.EndHeight()) types.RollappHubHeightGauge.Set(float64(batch.EndHeight())) @@ -263,3 +272,17 @@ func (m *Manager) GetUnsubmittedBytes() int { func (m *Manager) GetUnsubmittedBlocks() uint64 { return m.State.Height() - m.LastSubmittedHeight.Load() } + +// UpdateLastSubmittedHeight will update last height submitted height upon events. +// This may be necessary in case we crashed/restarted before getting response for our submission to the settlement layer. +func (m *Manager) UpdateLastSubmittedHeight(event pubsub.Message) { + eventData, ok := event.Data().(*settlement.EventDataNewBatchAccepted) + if !ok { + m.logger.Error("onReceivedBatch", "err", "wrong event data received") + return + } + h := eventData.EndHeight + if m.LastSubmittedHeight.Load() < h { + m.LastSubmittedHeight.Store(h) + } +} diff --git a/settlement/dymension/dymension.go b/settlement/dymension/dymension.go index a5ee24adf..527612b2f 100644 --- a/settlement/dymension/dymension.go +++ b/settlement/dymension/dymension.go @@ -151,11 +151,6 @@ func (c *Client) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *d return err }) if err != nil { - // this could happen if we timed-out waiting for acceptance in the previous iteration, but the batch was indeed submitted - if errors.Is(err, gerrc.ErrAlreadyExists) { - c.logger.Debug("Batch already accepted", "startHeight", batch.StartHeight(), "endHeight", batch.EndHeight()) - return nil - } return fmt.Errorf("broadcast batch: %w", err) } diff --git a/settlement/dymension/dymension_test.go b/settlement/dymension/dymension_test.go index 3a89d21c3..d804bef78 100644 --- a/settlement/dymension/dymension_test.go +++ b/settlement/dymension/dymension_test.go @@ -67,76 +67,6 @@ func TestGetSequencers(t *testing.T) { require.Len(sequencers, count) } -// TestPostBatchRPCError should tests the scenario where batch submitted but the acceptance signal failed. -// we expect an attempt to broadcast the batch again, and receive "already submitted" error response. -func TestPostBatchRPCError(t *testing.T) { - require := require.New(t) - pubsubServer := pubsub.NewServer() - err := pubsubServer.Start() - require.NoError(err) - - // Create a mock cosmos client - cosmosClientMock := dymensionmock.NewMockCosmosClient(t) - sequencerQueryClientMock := sequencertypesmock.NewMockQueryClient(t) - rollappQueryClientMock := rollapptypesmock.NewMockQueryClient(t) - cosmosClientMock.On("GetRollappClient").Return(rollappQueryClientMock) - cosmosClientMock.On("GetSequencerClient").Return(sequencerQueryClientMock) - accountPubkey, err := sdkcodectypes.NewAnyWithValue(secp256k1.GenPrivKey().PubKey()) - require.NoError(err) - cosmosClientMock.On("GetAccount", mock.Anything).Return(cosmosaccount.Account{Record: &keyring.Record{PubKey: accountPubkey}}, nil) - - options := []settlement.Option{ - dymension.WithCosmosClient(cosmosClientMock), - dymension.WithBatchAcceptanceTimeout(time.Millisecond * 300), - dymension.WithBatchAcceptanceAttempts(2), - } - - // Create a batch which will be submitted - proposerKey, _, err := crypto.GenerateEd25519Key(nil) - require.NoError(err) - batch, err := testutil.GenerateBatch(2, 2, proposerKey) - require.NoError(err) - - hubClient := dymension.Client{} - err = hubClient.Init(settlement.Config{}, "rollappTest", pubsubServer, log.TestingLogger(), options...) - require.NoError(err) - - // submit passes - cosmosClientMock.On("BroadcastTx", mock.Anything, mock.Anything).Return(cosmosclient.Response{TxResponse: &types.TxResponse{Code: 0}}, nil).Once() - - // return old batch for inclusion check - daMetaData := &da.DASubmitMetaData{ - Height: 1, - Client: da.Mock, - } - rollappQueryClientMock.On("StateInfo", mock.Anything, mock.Anything).Return( - &rollapptypes.QueryGetStateInfoResponse{StateInfo: rollapptypes.StateInfo{ - StartHeight: 1, StateInfoIndex: rollapptypes.StateInfoIndex{Index: 1}, DAPath: daMetaData.ToPath(), NumBlocks: 1, - }}, - nil).Times(2) - - // submit returns already exists error - submitBatchError := errors.New("rpc error: code = Unknown desc = rpc error: code = Unknown desc = failed to execute message; message index: 0: expected height (5), but received (4): start-height does not match rollapps state") - cosmosClientMock.On("BroadcastTx", mock.Anything, mock.Anything).Return(cosmosclient.Response{TxResponse: &types.TxResponse{Code: 1}}, submitBatchError).Once() - - resultSubmitBatch := &da.ResultSubmitBatch{} - resultSubmitBatch.SubmitMetaData = &da.DASubmitMetaData{} - errChan := make(chan error, 1) // Create a channel to receive an error from the goroutine - // Post the batch in a goroutine and capture any error. - go func() { - err := hubClient.SubmitBatch(batch, da.Mock, resultSubmitBatch) - errChan <- err // Send any error to the errChan - }() - - // Use a select statement to wait for a potential error or a timeout. - select { - case err = <-errChan: - case <-time.After(3 * time.Second): - err = errors.New("timeout") - } - assert.NoError(t, err) -} - // TestPostBatch should test the following: // 1. Batch fails to submit emits unhealthy event // 2. Batch is submitted successfully but not accepted which emits unhealthy event