Skip to content

Commit

Permalink
fix: dymint out of sync in case of missed hub state update event (#384)
Browse files Browse the repository at this point in the history
  • Loading branch information
omritoptix authored Jun 29, 2023
1 parent cc439f3 commit 6c190da
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 42 deletions.
37 changes: 37 additions & 0 deletions settlement/dymension/dymension.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,24 @@ WaitForBatchAcceptance:
map[string][]string{settlement.EventTypeKey: {settlement.EventSettlementHealthStatus}})
return
case <-ticker.C:
// Before emitting unhealthy event, check if the batch was accepted by the settlement layer and
// we've just missed the event.
includedBatch, err := d.waitForBatchInclusion(batch.StartHeight)
if err == nil {
d.logger.Debug("Batch accepted by settlement layer. Emitting events")
// Emit batch accepted event
batchAcceptedEvent := &settlement.EventDataNewSettlementBatchAccepted{
EndHeight: includedBatch.EndHeight,
StateIndex: includedBatch.StateIndex,
}
utils.SubmitEventOrPanic(d.ctx, d.pubsub, batchAcceptedEvent,
map[string][]string{settlement.EventTypeKey: {settlement.EventNewSettlementBatchAccepted}})
// Emit health event
heatlhEventData := settlement.EventDataSettlementHealthStatus{Healthy: true}
utils.SubmitEventOrPanic(d.ctx, d.pubsub, heatlhEventData,
map[string][]string{settlement.EventTypeKey: {settlement.EventSettlementHealthStatus}})
return
}
// Batch was not accepted by the settlement layer. Emitting unhealthy event
d.logger.Debug("Batch not accepted by settlement layer. Emitting unhealthy event",
"startHeight", batch.StartHeight, "endHeight", batch.EndHeight)
Expand Down Expand Up @@ -464,3 +482,22 @@ func (d *HubClient) convertStateInfoToResultRetrieveBatch(stateInfo *rollapptype
BaseResult: settlement.BaseResult{Code: settlement.StatusSuccess, StateIndex: stateInfo.StateInfoIndex.Index},
Batch: batchResult}, nil
}

// TODO(omritoptix): Change the retry attempts to be only for the batch polling. Also we need to have a more
// bullet proof check as theoretically the tx can stay in the mempool longer then our retry attempts.
func (d *HubClient) waitForBatchInclusion(batchStartHeight uint64) (*settlement.ResultRetrieveBatch, error) {
var resultRetriveBatch *settlement.ResultRetrieveBatch
err := retry.Do(func() error {
latestBatch, err := d.GetLatestBatch(d.config.RollappID)
if err != nil {
return err
}
if latestBatch.Batch.StartHeight == batchStartHeight {
resultRetriveBatch = latestBatch
return nil
}
return settlement.ErrBatchNotFound
}, retry.Context(d.ctx), retry.LastErrorOnly(true), retry.Delay(d.batchRetryDelay), retry.Attempts(d.batchRetryAttempts))
return resultRetriveBatch, err

}
168 changes: 126 additions & 42 deletions settlement/dymension/dymension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -20,6 +22,7 @@ import (
coretypes "github.com/tendermint/tendermint/rpc/core/types"

"github.com/dymensionxyz/cosmosclient/cosmosclient"
rollapptypes "github.com/dymensionxyz/dymension/x/rollapp/types"
sequencertypes "github.com/dymensionxyz/dymension/x/sequencer/types"
"github.com/dymensionxyz/dymint/da"
"github.com/dymensionxyz/dymint/log/test"
Expand Down Expand Up @@ -61,7 +64,8 @@ func TestGetSequencers(t *testing.T) {
// TestPostBatch should test the following:
// 1. Batch fails to submit emits unhealthy event
// 2. Batch is submitted successfully but not accepted which emits unhealthy event
// 3. Batch is submitted successfully and accepted which emits healthy event
// 3. Batch is submitted successfully, hub event not emitted, but checking for inclusion succeeds
// 4. Batch is submitted successfully and accepted by catching hub event
func TestPostBatch(t *testing.T) {
require := require.New(t)

Expand All @@ -71,7 +75,8 @@ func TestPostBatch(t *testing.T) {
// Create a mock cosmos client
cosmosClientMock := mocks.NewCosmosClient(t)
sequencerQueryClientMock := settlementmocks.NewSequencerQueryClient(t)
cosmosClientMock.On("GetRollappClient").Return(settlementmocks.NewRollAppQueryClient(t))
rollappQueryClientMock := settlementmocks.NewRollAppQueryClient(t)
cosmosClientMock.On("GetRollappClient").Return(rollappQueryClientMock)
cosmosClientMock.On("GetSequencerClient").Return(sequencerQueryClientMock)
submitBatchError := errors.New("failed to submit batch")
accountPubkey, err := sdkcodectypes.NewAnyWithValue(secp256k1.GenPrivKey().PubKey())
Expand All @@ -94,83 +99,162 @@ func TestPostBatch(t *testing.T) {
// Create a batch which will be submitted
propserKey, _, err := crypto.GenerateEd25519Key(nil)
require.NoError(err)
batch, err := testutil.GenerateBatch(0, 1, propserKey)
batch, err := testutil.GenerateBatch(1, 1, propserKey)
require.NoError(err)

// Subscribe to the health status event
HealthSubscription, err := pubsubServer.Subscribe(context.Background(), "testPostBatch", settlement.EventQuerySettlementHealthStatus)
assert.NoError(t, err)

// Subscribe to the batch accepted event
BatchAcceptedSubscription, err := pubsubServer.Subscribe(context.Background(), "testPostBatch", settlement.EventQueryNewSettlementBatchAccepted)
assert.NoError(t, err)

cases := []struct {
name string
isBatchSubmitSuccess bool
isBatchAccepted bool
expectedHealthStatus bool
expectedError error
name string
isBatchSubmitSuccess bool
isBatchAcceptedHubEvent bool
shouldMockBatchIncluded bool
isBatchIncludedSuccess bool
expectedBatchAcceptedEvent bool
expectedHealthEventValue bool
expectedError error
}{
{
name: "TestSubmitBatchFailure",
isBatchSubmitSuccess: false,
isBatchAccepted: false,
expectedHealthStatus: false,
expectedError: submitBatchError,
name: "TestSubmitBatchFailure",
isBatchSubmitSuccess: false,
isBatchAcceptedHubEvent: false,
shouldMockBatchIncluded: true,
isBatchIncludedSuccess: false,
expectedHealthEventValue: false,
expectedBatchAcceptedEvent: false,
expectedError: submitBatchError,
},
{
name: "TestSubmitBatchSuccessNotAccepted",
isBatchSubmitSuccess: true,
isBatchAccepted: false,
expectedHealthStatus: false,
expectedError: settlement.ErrBatchNotAccepted,
name: "TestSubmitBatchSuccessNoBatchAcceptedHubEventNotIncluded",
isBatchSubmitSuccess: true,
isBatchAcceptedHubEvent: false,
shouldMockBatchIncluded: true,
isBatchIncludedSuccess: false,
expectedHealthEventValue: false,
expectedBatchAcceptedEvent: false,
expectedError: settlement.ErrBatchNotAccepted,
},
{
name: "TestSubmitBatchSuccessAndAccepted",
isBatchSubmitSuccess: true,
isBatchAccepted: true,
expectedHealthStatus: true,
expectedError: nil,
name: "TestSubmitBatchSuccessNotAcceptedYesIncluded",
isBatchSubmitSuccess: true,
isBatchAcceptedHubEvent: false,
shouldMockBatchIncluded: true,
isBatchIncludedSuccess: true,
expectedHealthEventValue: true,
expectedBatchAcceptedEvent: true,
},
{
name: "TestSubmitBatchSuccessAndAccepted",
isBatchSubmitSuccess: true,
isBatchAcceptedHubEvent: true,
shouldMockBatchIncluded: false,
expectedHealthEventValue: true,
expectedError: nil,
expectedBatchAcceptedEvent: true,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
done := make(chan bool)
// Init the wait group and set the number of expected events
var wg sync.WaitGroup
var eventsCount int
if c.expectedBatchAcceptedEvent {
eventsCount = 2
} else {
eventsCount = 1
}
wg.Add(eventsCount)
// Reset the mock functions
testutil.UnsetMockFn(cosmosClientMock.On("BroadcastTx"))
testutil.UnsetMockFn(rollappQueryClientMock.On("LatestStateIndex"))
testutil.UnsetMockFn(rollappQueryClientMock.On("StateInfo"))
// Set the mock logic based on the test case
if !c.isBatchSubmitSuccess {
cosmosClientMock.On("BroadcastTx", mock.Anything, mock.Anything).Return(cosmosclient.Response{TxResponse: &types.TxResponse{Code: 1}}, submitBatchError)
} else {
cosmosClientMock.On("BroadcastTx", mock.Anything, mock.Anything).Return(cosmosclient.Response{TxResponse: &types.TxResponse{Code: 0}}, nil)
}
if c.shouldMockBatchIncluded {
if c.isBatchIncludedSuccess {
rollappQueryClientMock.On("LatestStateIndex", mock.Anything, mock.Anything).Return(
&rollapptypes.QueryGetLatestStateIndexResponse{StateIndex: rollapptypes.StateInfoIndex{Index: 1}}, nil)
daMetaData := &settlement.DAMetaData{
Height: 1,
Client: da.Mock,
}
rollappQueryClientMock.On("StateInfo", mock.Anything, mock.Anything).Return(
&rollapptypes.QueryGetStateInfoResponse{StateInfo: rollapptypes.StateInfo{
StartHeight: batch.StartHeight, StateInfoIndex: rollapptypes.StateInfoIndex{Index: 1}, DAPath: daMetaData.ToPath(), NumBlocks: 1}},
nil)
} else {
rollappQueryClientMock.On("LatestStateIndex", mock.Anything, mock.Anything).Return(nil, nil)
}
}
hubClient, err := newDymensionHubClient(settlement.Config{}, pubsubServer, test.NewLogger(t), options...)
require.NoError(err)
hubClient.Start()
// Handle the various events that are emitted and timeout if we don't get them
var eventsReceivedCount int64
go func() {
select {
case healthEvent := <-HealthSubscription.Out():
t.Logf("got health event: %v", healthEvent)
healthStatusEvent := healthEvent.Data().(settlement.EventDataSettlementHealthStatus)
assert.Equal(t, c.expectedHealthEventValue, healthStatusEvent.Healthy)
assert.Equal(t, c.expectedError, healthStatusEvent.Error)
wg.Done()
atomic.AddInt64(&eventsReceivedCount, 1)
return
}
}()
if c.expectedBatchAcceptedEvent {
go func() {
select {
case batchAcceptedEvent := <-BatchAcceptedSubscription.Out():
t.Logf("got batch accepted event: %v", batchAcceptedEvent)
batchAcceptedEventData := batchAcceptedEvent.Data().(*settlement.EventDataNewSettlementBatchAccepted)
assert.Equal(t, batchAcceptedEventData.EndHeight, batch.EndHeight)
wg.Done()
atomic.AddInt64(&eventsReceivedCount, 1)
return
}
}()
}
go func() {
select {
case <-time.After(2 * time.Second):
eventsRecievedCount := int(atomic.LoadInt64(&eventsReceivedCount))
if eventsRecievedCount < eventsCount {
t.Error("Didn't recieve all events expected")
for i := 0; i < eventsCount-eventsRecievedCount; i++ {
wg.Done()
}
}
return
}
}()
// Post the batch
go hubClient.PostBatch(batch, da.Mock, &da.ResultSubmitBatch{})
// Wait for the batch to be submitted and submit an event notifying that the batch was accepted
time.Sleep(50 * time.Millisecond)
if c.isBatchAccepted {
if c.isBatchAcceptedHubEvent {
batchAcceptedCh <- coretypes.ResultEvent{
Query: fmt.Sprintf("state_update.rollapp_id='%s'", ""),
Events: map[string][]string{
"state_update.num_blocks": {"1"},
"state_update.start_height": {"0"},
"state_update.start_height": {"1"},
"state_update.state_info_index": {"1"},
},
}
}
go func() {
select {
case event := <-HealthSubscription.Out():
healthStatusEvent := event.Data().(settlement.EventDataSettlementHealthStatus)
assert.Equal(t, c.expectedHealthStatus, healthStatusEvent.Healthy)
assert.Equal(t, c.expectedError, healthStatusEvent.Error)
done <- true
break
case <-time.After(1 * time.Second):
t.Error("expected health status event but didn't get one")
done <- true
break
}
}()
<-done
// Wait for all events to be handled
wg.Wait()
// Stop the hub client and wait for it to stop
hubClient.Stop()
time.Sleep(1 * time.Second)
Expand Down
1 change: 1 addition & 0 deletions settlement/settlement.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type BaseResult struct {
Code StatusCode
// Message may contain settlement layer specific information (like detailed error message, etc)
Message string
//TODO(omritoptix): Move StateIndex to be part of the batch struct
// StateIndex is the rollapp-specific index the batch was saved in the SL
StateIndex uint64
}
Expand Down

0 comments on commit 6c190da

Please sign in to comment.