diff --git a/node/node_test.go b/node/node_test.go index 7c5b8c79e..697dc9bc7 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -73,9 +73,9 @@ func TestMempoolDirectly(t *testing.T) { RPC: config.RPCConfig{}, MempoolConfig: *tmcfg.DefaultMempoolConfig(), BlockManagerConfig: config.BlockManagerConfig{ - BlockTime: 100 * time.Millisecond, + BlockTime: 1 * time.Second, BatchSubmitMaxTime: 60 * time.Second, - BlockBatchMaxSizeBytes: 1000, + BlockBatchMaxSizeBytes: 100000, MaxSupportedBatchSkew: 10, }, DALayer: "mock", diff --git a/settlement/dymension/dymension.go b/settlement/dymension/dymension.go index cb559ce08..22f19089d 100644 --- a/settlement/dymension/dymension.go +++ b/settlement/dymension/dymension.go @@ -2,7 +2,9 @@ package dymension import ( "context" + "errors" "fmt" + "strings" "time" "github.com/dymensionxyz/dymint/gerr" @@ -153,13 +155,15 @@ func (c *Client) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *d //nolint:errcheck defer c.pubsub.Unsubscribe(c.ctx, postBatchSubscriberClient, settlement.EventQueryNewSettlementBatchAccepted) - // Try submitting the batch to the settlement layer: - // 1. broadcast the transaction to the blockchain (with retries). - // 2. wait for the batch to be accepted by the settlement layer. for { + // broadcast loop: broadcast the transaction to the blockchain (with infinite retries). err := c.RunWithRetryInfinitely(func() error { err := c.broadcastBatch(msgUpdateState) if err != nil { + if errors.Is(err, gerr.ErrAlreadyExist) { + return retry.Unrecoverable(err) + } + c.logger.Error( "Submit batch", "startHeight", @@ -173,13 +177,18 @@ func (c *Client) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *d return err }) if err != nil { + // this could happen if we timed-out waiting for acceptance in the previous iteration, but the batch was indeed submitted + if errors.Is(err, gerr.ErrAlreadyExist) { + c.logger.Debug("Batch already accepted", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight) + return nil + } return fmt.Errorf("broadcast batch: %w", err) } // Batch was submitted successfully. Wait for it to be accepted by the settlement layer. timer := time.NewTimer(c.batchAcceptanceTimeout) defer timer.Stop() - attempt := uint64(0) + attempt := uint64(1) for { select { @@ -193,37 +202,47 @@ func (c *Client) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *d eventData, _ := event.Data().(*settlement.EventDataNewBatchAccepted) if eventData.EndHeight != batch.EndHeight { c.logger.Debug("Received event for a different batch, ignoring.", "event", eventData) - continue + continue // continue waiting for acceptance of the current batch } c.logger.Info("Batch accepted.", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight, "stateIndex", eventData.StateIndex) return nil case <-timer.C: // Check if the batch was accepted by the settlement layer, and we've just missed the event. - attempt++ - includedBatch, err := c.pollForBatchInclusion(batch.EndHeight, attempt) + includedBatch, err := c.pollForBatchInclusion(batch.EndHeight) + timer.Reset(c.batchAcceptanceTimeout) + // no error, but still not included if err == nil && !includedBatch { - // no error, but still not included - timer.Reset(c.batchAcceptanceTimeout) - continue + attempt++ + if attempt <= uint64(c.batchAcceptanceAttempts) { + continue // continue waiting for acceptance of the current batch + } + c.logger.Error( + "Timed out waiting for batch inclusion on settlement layer", + "startHeight", + batch.StartHeight, + "endHeight", + batch.EndHeight, + ) + break // breaks the switch case, and goes back to the broadcast loop } - // all good - if err == nil { - c.logger.Info("Batch accepted", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight) - return nil + if err != nil { + c.logger.Error( + "Wait for batch inclusion", + "startHeight", + batch.StartHeight, + "endHeight", + batch.EndHeight, + "error", + err, + ) + continue // continue waiting for acceptance of the current batch } + // all good + c.logger.Info("Batch accepted", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight) + return nil } - // If errored polling, start again the submission loop. - c.logger.Error( - "Wait for batch inclusion", - "startHeight", - batch.StartHeight, - "endHeight", - batch.EndHeight, - "error", - err, - ) - break + break // failed waiting for acceptance. broadcast the batch again } } } @@ -348,9 +367,15 @@ func (c *Client) GetSequencers() ([]*types.Sequencer, error) { func (c *Client) broadcastBatch(msgUpdateState *rollapptypes.MsgUpdateState) error { txResp, err := c.cosmosClient.BroadcastTx(c.config.DymAccountName, msgUpdateState) - if err != nil || txResp.Code != 0 { + if err != nil { + if strings.Contains(err.Error(), rollapptypes.ErrWrongBlockHeight.Error()) { + err = fmt.Errorf("%w: %w", err, gerr.ErrAlreadyExist) + } return fmt.Errorf("broadcast tx: %w", err) } + if txResp.Code != 0 { + return fmt.Errorf("broadcast tx status code is not 0: %w", gerr.ErrUnknown) + } return nil } @@ -448,16 +473,11 @@ func (c *Client) getEventData(eventType string, rawEventData ctypes.ResultEvent) } // pollForBatchInclusion polls the hub for the inclusion of a batch with the given end height. -func (c *Client) pollForBatchInclusion(batchEndHeight, attempt uint64) (bool, error) { +func (c *Client) pollForBatchInclusion(batchEndHeight uint64) (bool, error) { latestBatch, err := c.GetLatestBatch() if err != nil { return false, fmt.Errorf("get latest batch: %w", err) } - // no error, but still not included - if attempt >= uint64(c.batchAcceptanceAttempts) { - return false, fmt.Errorf("timed out waiting for batch inclusion on settlement layer") - } - return latestBatch.Batch.EndHeight == batchEndHeight, nil } diff --git a/settlement/dymension/dymension_test.go b/settlement/dymension/dymension_test.go index da0d5b9fa..8e78ec2e1 100644 --- a/settlement/dymension/dymension_test.go +++ b/settlement/dymension/dymension_test.go @@ -68,6 +68,76 @@ func TestGetSequencers(t *testing.T) { require.Len(sequencers, count) } +// TestPostBatchRPCError should tests the scenario where batch submitted but the acceptance signal failed. +// we expect an attempt to broadcast the batch again, and receive "already submitted" error response. +func TestPostBatchRPCError(t *testing.T) { + require := require.New(t) + pubsubServer := pubsub.NewServer() + err := pubsubServer.Start() + require.NoError(err) + + // Create a mock cosmos client + cosmosClientMock := dymensionmock.NewMockCosmosClient(t) + sequencerQueryClientMock := sequencertypesmock.NewMockQueryClient(t) + rollappQueryClientMock := rollapptypesmock.NewMockQueryClient(t) + cosmosClientMock.On("GetRollappClient").Return(rollappQueryClientMock) + cosmosClientMock.On("GetSequencerClient").Return(sequencerQueryClientMock) + accountPubkey, err := sdkcodectypes.NewAnyWithValue(secp256k1.GenPrivKey().PubKey()) + require.NoError(err) + cosmosClientMock.On("GetAccount", mock.Anything).Return(cosmosaccount.Account{Record: &keyring.Record{PubKey: accountPubkey}}, nil) + + options := []settlement.Option{ + dymension.WithCosmosClient(cosmosClientMock), + dymension.WithBatchAcceptanceTimeout(time.Millisecond * 300), + dymension.WithBatchAcceptanceAttempts(2), + } + + // Create a batch which will be submitted + proposerKey, _, err := crypto.GenerateEd25519Key(nil) + require.NoError(err) + batch, err := testutil.GenerateBatch(2, 2, proposerKey) + require.NoError(err) + + hubClient := dymension.Client{} + err = hubClient.Init(settlement.Config{}, pubsubServer, log.TestingLogger(), options...) + require.NoError(err) + + // submit passes + cosmosClientMock.On("BroadcastTx", mock.Anything, mock.Anything).Return(cosmosclient.Response{TxResponse: &types.TxResponse{Code: 0}}, nil).Once() + + // return old batch for inclusion check + daMetaData := &da.DASubmitMetaData{ + Height: 1, + Client: da.Mock, + } + rollappQueryClientMock.On("StateInfo", mock.Anything, mock.Anything).Return( + &rollapptypes.QueryGetStateInfoResponse{StateInfo: rollapptypes.StateInfo{ + StartHeight: 1, StateInfoIndex: rollapptypes.StateInfoIndex{Index: 1}, DAPath: daMetaData.ToPath(), NumBlocks: 1, + }}, + nil).Times(2) + + // submit returns already exists error + submitBatchError := errors.New("rpc error: code = Unknown desc = rpc error: code = Unknown desc = failed to execute message; message index: 0: expected height (5), but received (4): start-height does not match rollapps state") + cosmosClientMock.On("BroadcastTx", mock.Anything, mock.Anything).Return(cosmosclient.Response{TxResponse: &types.TxResponse{Code: 1}}, submitBatchError).Once() + + resultSubmitBatch := &da.ResultSubmitBatch{} + resultSubmitBatch.SubmitMetaData = &da.DASubmitMetaData{} + errChan := make(chan error, 1) // Create a channel to receive an error from the goroutine + // Post the batch in a goroutine and capture any error. + go func() { + err := hubClient.SubmitBatch(batch, da.Mock, resultSubmitBatch) + errChan <- err // Send any error to the errChan + }() + + // Use a select statement to wait for a potential error or a timeout. + select { + case err = <-errChan: + case <-time.After(3 * time.Second): + err = errors.New("timeout") + } + assert.NoError(t, err) +} + // TestPostBatch should test the following: // 1. Batch fails to submit emits unhealthy event // 2. Batch is submitted successfully but not accepted which emits unhealthy event @@ -95,7 +165,6 @@ func TestPostBatch(t *testing.T) { cosmosClientMock.On("StartEventListener").Return(nil) cosmosClientMock.On("EventListenerQuit").Return(make(<-chan struct{})) batchAcceptedCh := make(chan coretypes.ResultEvent, 1) - require.NoError(err) cosmosClientMock.On("SubscribeToEvents", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return((<-chan coretypes.ResultEvent)(batchAcceptedCh), nil) options := []settlement.Option{ diff --git a/settlement/dymension/options.go b/settlement/dymension/options.go index b3d84e91e..94ffa07c3 100644 --- a/settlement/dymension/options.go +++ b/settlement/dymension/options.go @@ -30,6 +30,14 @@ func WithBatchAcceptanceTimeout(batchAcceptanceTimeout time.Duration) settlement } } +// WithBatchAcceptanceAttempts is an option that sets the number of attempts to check if a batch has been accepted by the settlement layer. +func WithBatchAcceptanceAttempts(batchAcceptanceAttempts uint) settlement.Option { + return func(c settlement.ClientI) { + dlc, _ := c.(*Client) + dlc.batchAcceptanceAttempts = batchAcceptanceAttempts + } +} + // WithRetryMinDelay is an option that sets the retry function mindelay between hub retry attempts. func WithRetryMinDelay(retryMinDelay time.Duration) settlement.Option { return func(c settlement.ClientI) {