Skip to content

Commit

Permalink
fix(settlement): hub disconnect and reconnect causes sequencer to sen…
Browse files Browse the repository at this point in the history
…d wrong batch (#889)

Co-authored-by: Daniel T <[email protected]>
  • Loading branch information
2 people authored and omritoptix committed Jun 3, 2024
1 parent 445d1ec commit 7cee1e8
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 35 deletions.
4 changes: 2 additions & 2 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ func TestMempoolDirectly(t *testing.T) {
RPC: config.RPCConfig{},
MempoolConfig: *tmcfg.DefaultMempoolConfig(),
BlockManagerConfig: config.BlockManagerConfig{
BlockTime: 100 * time.Millisecond,
BlockTime: 1 * time.Second,
BatchSubmitMaxTime: 60 * time.Second,
BlockBatchMaxSizeBytes: 1000,
BlockBatchMaxSizeBytes: 100000,
MaxSupportedBatchSkew: 10,
},
DALayer: "mock",
Expand Down
84 changes: 52 additions & 32 deletions settlement/dymension/dymension.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package dymension

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/dymensionxyz/dymint/gerr"
Expand Down Expand Up @@ -153,13 +155,15 @@ func (c *Client) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *d
//nolint:errcheck
defer c.pubsub.Unsubscribe(c.ctx, postBatchSubscriberClient, settlement.EventQueryNewSettlementBatchAccepted)

// Try submitting the batch to the settlement layer:
// 1. broadcast the transaction to the blockchain (with retries).
// 2. wait for the batch to be accepted by the settlement layer.
for {
// broadcast loop: broadcast the transaction to the blockchain (with infinite retries).
err := c.RunWithRetryInfinitely(func() error {
err := c.broadcastBatch(msgUpdateState)
if err != nil {
if errors.Is(err, gerr.ErrAlreadyExist) {
return retry.Unrecoverable(err)
}

c.logger.Error(
"Submit batch",
"startHeight",
Expand All @@ -173,13 +177,18 @@ func (c *Client) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *d
return err
})
if err != nil {
// this could happen if we timed-out waiting for acceptance in the previous iteration, but the batch was indeed submitted
if errors.Is(err, gerr.ErrAlreadyExist) {
c.logger.Debug("Batch already accepted", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight)
return nil
}
return fmt.Errorf("broadcast batch: %w", err)
}

// Batch was submitted successfully. Wait for it to be accepted by the settlement layer.
timer := time.NewTimer(c.batchAcceptanceTimeout)
defer timer.Stop()
attempt := uint64(0)
attempt := uint64(1)

for {
select {
Expand All @@ -193,37 +202,47 @@ func (c *Client) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *d
eventData, _ := event.Data().(*settlement.EventDataNewBatchAccepted)
if eventData.EndHeight != batch.EndHeight {
c.logger.Debug("Received event for a different batch, ignoring.", "event", eventData)
continue
continue // continue waiting for acceptance of the current batch
}
c.logger.Info("Batch accepted.", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight, "stateIndex", eventData.StateIndex)
return nil

case <-timer.C:
// Check if the batch was accepted by the settlement layer, and we've just missed the event.
attempt++
includedBatch, err := c.pollForBatchInclusion(batch.EndHeight, attempt)
includedBatch, err := c.pollForBatchInclusion(batch.EndHeight)
timer.Reset(c.batchAcceptanceTimeout)
// no error, but still not included
if err == nil && !includedBatch {
// no error, but still not included
timer.Reset(c.batchAcceptanceTimeout)
continue
attempt++
if attempt <= uint64(c.batchAcceptanceAttempts) {
continue // continue waiting for acceptance of the current batch
}
c.logger.Error(
"Timed out waiting for batch inclusion on settlement layer",
"startHeight",
batch.StartHeight,
"endHeight",
batch.EndHeight,
)
break // breaks the switch case, and goes back to the broadcast loop
}
// all good
if err == nil {
c.logger.Info("Batch accepted", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight)
return nil
if err != nil {
c.logger.Error(
"Wait for batch inclusion",
"startHeight",
batch.StartHeight,
"endHeight",
batch.EndHeight,
"error",
err,
)
continue // continue waiting for acceptance of the current batch
}
// all good
c.logger.Info("Batch accepted", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight)
return nil
}
// If errored polling, start again the submission loop.
c.logger.Error(
"Wait for batch inclusion",
"startHeight",
batch.StartHeight,
"endHeight",
batch.EndHeight,
"error",
err,
)
break
break // failed waiting for acceptance. broadcast the batch again
}
}
}
Expand Down Expand Up @@ -348,9 +367,15 @@ func (c *Client) GetSequencers() ([]*types.Sequencer, error) {

func (c *Client) broadcastBatch(msgUpdateState *rollapptypes.MsgUpdateState) error {
txResp, err := c.cosmosClient.BroadcastTx(c.config.DymAccountName, msgUpdateState)
if err != nil || txResp.Code != 0 {
if err != nil {
if strings.Contains(err.Error(), rollapptypes.ErrWrongBlockHeight.Error()) {
err = fmt.Errorf("%w: %w", err, gerr.ErrAlreadyExist)
}
return fmt.Errorf("broadcast tx: %w", err)
}
if txResp.Code != 0 {
return fmt.Errorf("broadcast tx status code is not 0: %w", gerr.ErrUnknown)
}
return nil
}

Expand Down Expand Up @@ -448,16 +473,11 @@ func (c *Client) getEventData(eventType string, rawEventData ctypes.ResultEvent)
}

// pollForBatchInclusion polls the hub for the inclusion of a batch with the given end height.
func (c *Client) pollForBatchInclusion(batchEndHeight, attempt uint64) (bool, error) {
func (c *Client) pollForBatchInclusion(batchEndHeight uint64) (bool, error) {
latestBatch, err := c.GetLatestBatch()
if err != nil {
return false, fmt.Errorf("get latest batch: %w", err)
}

// no error, but still not included
if attempt >= uint64(c.batchAcceptanceAttempts) {
return false, fmt.Errorf("timed out waiting for batch inclusion on settlement layer")
}

return latestBatch.Batch.EndHeight == batchEndHeight, nil
}
71 changes: 70 additions & 1 deletion settlement/dymension/dymension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,76 @@ func TestGetSequencers(t *testing.T) {
require.Len(sequencers, count)
}

// TestPostBatchRPCError should tests the scenario where batch submitted but the acceptance signal failed.
// we expect an attempt to broadcast the batch again, and receive "already submitted" error response.
func TestPostBatchRPCError(t *testing.T) {
require := require.New(t)
pubsubServer := pubsub.NewServer()
err := pubsubServer.Start()
require.NoError(err)

// Create a mock cosmos client
cosmosClientMock := dymensionmock.NewMockCosmosClient(t)
sequencerQueryClientMock := sequencertypesmock.NewMockQueryClient(t)
rollappQueryClientMock := rollapptypesmock.NewMockQueryClient(t)
cosmosClientMock.On("GetRollappClient").Return(rollappQueryClientMock)
cosmosClientMock.On("GetSequencerClient").Return(sequencerQueryClientMock)
accountPubkey, err := sdkcodectypes.NewAnyWithValue(secp256k1.GenPrivKey().PubKey())
require.NoError(err)
cosmosClientMock.On("GetAccount", mock.Anything).Return(cosmosaccount.Account{Record: &keyring.Record{PubKey: accountPubkey}}, nil)

options := []settlement.Option{
dymension.WithCosmosClient(cosmosClientMock),
dymension.WithBatchAcceptanceTimeout(time.Millisecond * 300),
dymension.WithBatchAcceptanceAttempts(2),
}

// Create a batch which will be submitted
proposerKey, _, err := crypto.GenerateEd25519Key(nil)
require.NoError(err)
batch, err := testutil.GenerateBatch(2, 2, proposerKey)
require.NoError(err)

hubClient := dymension.Client{}
err = hubClient.Init(settlement.Config{}, pubsubServer, log.TestingLogger(), options...)
require.NoError(err)

// submit passes
cosmosClientMock.On("BroadcastTx", mock.Anything, mock.Anything).Return(cosmosclient.Response{TxResponse: &types.TxResponse{Code: 0}}, nil).Once()

// return old batch for inclusion check
daMetaData := &da.DASubmitMetaData{
Height: 1,
Client: da.Mock,
}
rollappQueryClientMock.On("StateInfo", mock.Anything, mock.Anything).Return(
&rollapptypes.QueryGetStateInfoResponse{StateInfo: rollapptypes.StateInfo{
StartHeight: 1, StateInfoIndex: rollapptypes.StateInfoIndex{Index: 1}, DAPath: daMetaData.ToPath(), NumBlocks: 1,
}},
nil).Times(2)

// submit returns already exists error
submitBatchError := errors.New("rpc error: code = Unknown desc = rpc error: code = Unknown desc = failed to execute message; message index: 0: expected height (5), but received (4): start-height does not match rollapps state")
cosmosClientMock.On("BroadcastTx", mock.Anything, mock.Anything).Return(cosmosclient.Response{TxResponse: &types.TxResponse{Code: 1}}, submitBatchError).Once()

resultSubmitBatch := &da.ResultSubmitBatch{}
resultSubmitBatch.SubmitMetaData = &da.DASubmitMetaData{}
errChan := make(chan error, 1) // Create a channel to receive an error from the goroutine
// Post the batch in a goroutine and capture any error.
go func() {
err := hubClient.SubmitBatch(batch, da.Mock, resultSubmitBatch)
errChan <- err // Send any error to the errChan
}()

// Use a select statement to wait for a potential error or a timeout.
select {
case err = <-errChan:
case <-time.After(3 * time.Second):
err = errors.New("timeout")
}
assert.NoError(t, err)
}

// TestPostBatch should test the following:
// 1. Batch fails to submit emits unhealthy event
// 2. Batch is submitted successfully but not accepted which emits unhealthy event
Expand Down Expand Up @@ -95,7 +165,6 @@ func TestPostBatch(t *testing.T) {
cosmosClientMock.On("StartEventListener").Return(nil)
cosmosClientMock.On("EventListenerQuit").Return(make(<-chan struct{}))
batchAcceptedCh := make(chan coretypes.ResultEvent, 1)
require.NoError(err)
cosmosClientMock.On("SubscribeToEvents", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return((<-chan coretypes.ResultEvent)(batchAcceptedCh), nil)

options := []settlement.Option{
Expand Down
8 changes: 8 additions & 0 deletions settlement/dymension/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ func WithBatchAcceptanceTimeout(batchAcceptanceTimeout time.Duration) settlement
}
}

// WithBatchAcceptanceAttempts is an option that sets the number of attempts to check if a batch has been accepted by the settlement layer.
func WithBatchAcceptanceAttempts(batchAcceptanceAttempts uint) settlement.Option {
return func(c settlement.ClientI) {
dlc, _ := c.(*Client)
dlc.batchAcceptanceAttempts = batchAcceptanceAttempts
}
}

// WithRetryMinDelay is an option that sets the retry function mindelay between hub retry attempts.
func WithRetryMinDelay(retryMinDelay time.Duration) settlement.Option {
return func(c settlement.ClientI) {
Expand Down

0 comments on commit 7cee1e8

Please sign in to comment.