Skip to content

Commit

Permalink
fix(manager): sending wrong batch height if restarting right after su…
Browse files Browse the repository at this point in the history
…bmission (#1074)

Co-authored-by: Omri <[email protected]>
  • Loading branch information
srene and omritoptix authored Sep 16, 2024
1 parent 2da988f commit d51b961
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 75 deletions.
3 changes: 3 additions & 0 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ func (m *Manager) Start(ctx context.Context) error {
}

/* ----------------------------- sequencer mode ----------------------------- */
// Subscribe to batch events, to update last submitted height in case batch confirmation was lost. This could happen if the sequencer crash/restarted just after submitting a batch to the settelement and by the time we query the last batch, this batch wasn't accepted yet.
go uevent.MustSubscribe(ctx, m.Pubsub, "updateSubmittedHeightLoop", settlement.EventQueryNewSettlementBatchAccepted, m.UpdateLastSubmittedHeight, m.logger)

// Sequencer must wait till DA is synced to start submitting blobs
<-m.DAClient.Synced()
err = m.syncFromSettlement()
Expand Down
23 changes: 23 additions & 0 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"time"

"github.com/dymensionxyz/gerr-cosmos/gerrc"
"github.com/tendermint/tendermint/libs/pubsub"
"golang.org/x/sync/errgroup"

"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/types"
uatomic "github.com/dymensionxyz/dymint/utils/atomic"
uchannel "github.com/dymensionxyz/dymint/utils/channel"
Expand Down Expand Up @@ -115,6 +117,12 @@ func SubmitLoopInner(
logger.Error("Create and submit batch", "err", err, "pending", pending)
panic(err)
}
// this could happen if we timed-out waiting for acceptance in the previous iteration, but the batch was indeed submitted.
// we panic here cause restarting may reset the last batch submitted counter and the sequencer can potentially resume submitting batches.
if errors.Is(err, gerrc.ErrAlreadyExists) {
logger.Debug("Batch already accepted", "err", err, "pending", pending)
panic(err)
}
return err
}
timeLastSubmission = time.Now()
Expand Down Expand Up @@ -224,6 +232,7 @@ func (m *Manager) SubmitBatch(batch *types.Batch) error {
if err != nil {
return fmt.Errorf("sl client submit batch: start height: %d: end height: %d: %w", batch.StartHeight(), batch.EndHeight(), err)
}

m.logger.Info("Submitted batch to SL.", "start height", batch.StartHeight(), "end height", batch.EndHeight())

types.RollappHubHeightGauge.Set(float64(batch.EndHeight()))
Expand Down Expand Up @@ -263,3 +272,17 @@ func (m *Manager) GetUnsubmittedBytes() int {
func (m *Manager) GetUnsubmittedBlocks() uint64 {
return m.State.Height() - m.LastSubmittedHeight.Load()
}

// UpdateLastSubmittedHeight will update last height submitted height upon events.
// This may be necessary in case we crashed/restarted before getting response for our submission to the settlement layer.
func (m *Manager) UpdateLastSubmittedHeight(event pubsub.Message) {
eventData, ok := event.Data().(*settlement.EventDataNewBatchAccepted)
if !ok {
m.logger.Error("onReceivedBatch", "err", "wrong event data received")
return
}
h := eventData.EndHeight
if m.LastSubmittedHeight.Load() < h {
m.LastSubmittedHeight.Store(h)
}
}
5 changes: 0 additions & 5 deletions settlement/dymension/dymension.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,6 @@ func (c *Client) SubmitBatch(batch *types.Batch, daClient da.Client, daResult *d
return err
})
if err != nil {
// this could happen if we timed-out waiting for acceptance in the previous iteration, but the batch was indeed submitted
if errors.Is(err, gerrc.ErrAlreadyExists) {
c.logger.Debug("Batch already accepted", "startHeight", batch.StartHeight(), "endHeight", batch.EndHeight())
return nil
}
return fmt.Errorf("broadcast batch: %w", err)
}

Expand Down
70 changes: 0 additions & 70 deletions settlement/dymension/dymension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,76 +67,6 @@ func TestGetSequencers(t *testing.T) {
require.Len(sequencers, count)
}

// TestPostBatchRPCError should tests the scenario where batch submitted but the acceptance signal failed.
// we expect an attempt to broadcast the batch again, and receive "already submitted" error response.
func TestPostBatchRPCError(t *testing.T) {
require := require.New(t)
pubsubServer := pubsub.NewServer()
err := pubsubServer.Start()
require.NoError(err)

// Create a mock cosmos client
cosmosClientMock := dymensionmock.NewMockCosmosClient(t)
sequencerQueryClientMock := sequencertypesmock.NewMockQueryClient(t)
rollappQueryClientMock := rollapptypesmock.NewMockQueryClient(t)
cosmosClientMock.On("GetRollappClient").Return(rollappQueryClientMock)
cosmosClientMock.On("GetSequencerClient").Return(sequencerQueryClientMock)
accountPubkey, err := sdkcodectypes.NewAnyWithValue(secp256k1.GenPrivKey().PubKey())
require.NoError(err)
cosmosClientMock.On("GetAccount", mock.Anything).Return(cosmosaccount.Account{Record: &keyring.Record{PubKey: accountPubkey}}, nil)

options := []settlement.Option{
dymension.WithCosmosClient(cosmosClientMock),
dymension.WithBatchAcceptanceTimeout(time.Millisecond * 300),
dymension.WithBatchAcceptanceAttempts(2),
}

// Create a batch which will be submitted
proposerKey, _, err := crypto.GenerateEd25519Key(nil)
require.NoError(err)
batch, err := testutil.GenerateBatch(2, 2, proposerKey)
require.NoError(err)

hubClient := dymension.Client{}
err = hubClient.Init(settlement.Config{}, "rollappTest", pubsubServer, log.TestingLogger(), options...)
require.NoError(err)

// submit passes
cosmosClientMock.On("BroadcastTx", mock.Anything, mock.Anything).Return(cosmosclient.Response{TxResponse: &types.TxResponse{Code: 0}}, nil).Once()

// return old batch for inclusion check
daMetaData := &da.DASubmitMetaData{
Height: 1,
Client: da.Mock,
}
rollappQueryClientMock.On("StateInfo", mock.Anything, mock.Anything).Return(
&rollapptypes.QueryGetStateInfoResponse{StateInfo: rollapptypes.StateInfo{
StartHeight: 1, StateInfoIndex: rollapptypes.StateInfoIndex{Index: 1}, DAPath: daMetaData.ToPath(), NumBlocks: 1,
}},
nil).Times(2)

// submit returns already exists error
submitBatchError := errors.New("rpc error: code = Unknown desc = rpc error: code = Unknown desc = failed to execute message; message index: 0: expected height (5), but received (4): start-height does not match rollapps state")
cosmosClientMock.On("BroadcastTx", mock.Anything, mock.Anything).Return(cosmosclient.Response{TxResponse: &types.TxResponse{Code: 1}}, submitBatchError).Once()

resultSubmitBatch := &da.ResultSubmitBatch{}
resultSubmitBatch.SubmitMetaData = &da.DASubmitMetaData{}
errChan := make(chan error, 1) // Create a channel to receive an error from the goroutine
// Post the batch in a goroutine and capture any error.
go func() {
err := hubClient.SubmitBatch(batch, da.Mock, resultSubmitBatch)
errChan <- err // Send any error to the errChan
}()

// Use a select statement to wait for a potential error or a timeout.
select {
case err = <-errChan:
case <-time.After(3 * time.Second):
err = errors.New("timeout")
}
assert.NoError(t, err)
}

// TestPostBatch should test the following:
// 1. Batch fails to submit emits unhealthy event
// 2. Batch is submitted successfully but not accepted which emits unhealthy event
Expand Down

0 comments on commit d51b961

Please sign in to comment.