From 6c190dad28803dc67c9f34a4928af98b8fa28258 Mon Sep 17 00:00:00 2001 From: Omri Date: Thu, 29 Jun 2023 16:01:17 +0200 Subject: [PATCH] fix: dymint out of sync in case of missed hub state update event (#384) --- settlement/dymension/dymension.go | 37 ++++++ settlement/dymension/dymension_test.go | 168 ++++++++++++++++++------- settlement/settlement.go | 1 + 3 files changed, 164 insertions(+), 42 deletions(-) diff --git a/settlement/dymension/dymension.go b/settlement/dymension/dymension.go index fb9c3b10f..0562c5571 100644 --- a/settlement/dymension/dymension.go +++ b/settlement/dymension/dymension.go @@ -237,6 +237,24 @@ WaitForBatchAcceptance: map[string][]string{settlement.EventTypeKey: {settlement.EventSettlementHealthStatus}}) return case <-ticker.C: + // Before emitting unhealthy event, check if the batch was accepted by the settlement layer and + // we've just missed the event. + includedBatch, err := d.waitForBatchInclusion(batch.StartHeight) + if err == nil { + d.logger.Debug("Batch accepted by settlement layer. Emitting events") + // Emit batch accepted event + batchAcceptedEvent := &settlement.EventDataNewSettlementBatchAccepted{ + EndHeight: includedBatch.EndHeight, + StateIndex: includedBatch.StateIndex, + } + utils.SubmitEventOrPanic(d.ctx, d.pubsub, batchAcceptedEvent, + map[string][]string{settlement.EventTypeKey: {settlement.EventNewSettlementBatchAccepted}}) + // Emit health event + heatlhEventData := settlement.EventDataSettlementHealthStatus{Healthy: true} + utils.SubmitEventOrPanic(d.ctx, d.pubsub, heatlhEventData, + map[string][]string{settlement.EventTypeKey: {settlement.EventSettlementHealthStatus}}) + return + } // Batch was not accepted by the settlement layer. Emitting unhealthy event d.logger.Debug("Batch not accepted by settlement layer. Emitting unhealthy event", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight) @@ -464,3 +482,22 @@ func (d *HubClient) convertStateInfoToResultRetrieveBatch(stateInfo *rollapptype BaseResult: settlement.BaseResult{Code: settlement.StatusSuccess, StateIndex: stateInfo.StateInfoIndex.Index}, Batch: batchResult}, nil } + +// TODO(omritoptix): Change the retry attempts to be only for the batch polling. Also we need to have a more +// bullet proof check as theoretically the tx can stay in the mempool longer then our retry attempts. +func (d *HubClient) waitForBatchInclusion(batchStartHeight uint64) (*settlement.ResultRetrieveBatch, error) { + var resultRetriveBatch *settlement.ResultRetrieveBatch + err := retry.Do(func() error { + latestBatch, err := d.GetLatestBatch(d.config.RollappID) + if err != nil { + return err + } + if latestBatch.Batch.StartHeight == batchStartHeight { + resultRetriveBatch = latestBatch + return nil + } + return settlement.ErrBatchNotFound + }, retry.Context(d.ctx), retry.LastErrorOnly(true), retry.Delay(d.batchRetryDelay), retry.Attempts(d.batchRetryAttempts)) + return resultRetriveBatch, err + +} diff --git a/settlement/dymension/dymension_test.go b/settlement/dymension/dymension_test.go index a216360f1..860220aec 100644 --- a/settlement/dymension/dymension_test.go +++ b/settlement/dymension/dymension_test.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "sync" + "sync/atomic" "testing" "time" @@ -20,6 +22,7 @@ import ( coretypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/dymensionxyz/cosmosclient/cosmosclient" + rollapptypes "github.com/dymensionxyz/dymension/x/rollapp/types" sequencertypes "github.com/dymensionxyz/dymension/x/sequencer/types" "github.com/dymensionxyz/dymint/da" "github.com/dymensionxyz/dymint/log/test" @@ -61,7 +64,8 @@ func TestGetSequencers(t *testing.T) { // TestPostBatch should test the following: // 1. Batch fails to submit emits unhealthy event // 2. Batch is submitted successfully but not accepted which emits unhealthy event -// 3. Batch is submitted successfully and accepted which emits healthy event +// 3. Batch is submitted successfully, hub event not emitted, but checking for inclusion succeeds +// 4. Batch is submitted successfully and accepted by catching hub event func TestPostBatch(t *testing.T) { require := require.New(t) @@ -71,7 +75,8 @@ func TestPostBatch(t *testing.T) { // Create a mock cosmos client cosmosClientMock := mocks.NewCosmosClient(t) sequencerQueryClientMock := settlementmocks.NewSequencerQueryClient(t) - cosmosClientMock.On("GetRollappClient").Return(settlementmocks.NewRollAppQueryClient(t)) + rollappQueryClientMock := settlementmocks.NewRollAppQueryClient(t) + cosmosClientMock.On("GetRollappClient").Return(rollappQueryClientMock) cosmosClientMock.On("GetSequencerClient").Return(sequencerQueryClientMock) submitBatchError := errors.New("failed to submit batch") accountPubkey, err := sdkcodectypes.NewAnyWithValue(secp256k1.GenPrivKey().PubKey()) @@ -94,83 +99,162 @@ func TestPostBatch(t *testing.T) { // Create a batch which will be submitted propserKey, _, err := crypto.GenerateEd25519Key(nil) require.NoError(err) - batch, err := testutil.GenerateBatch(0, 1, propserKey) + batch, err := testutil.GenerateBatch(1, 1, propserKey) require.NoError(err) - // Subscribe to the health status event HealthSubscription, err := pubsubServer.Subscribe(context.Background(), "testPostBatch", settlement.EventQuerySettlementHealthStatus) assert.NoError(t, err) + // Subscribe to the batch accepted event + BatchAcceptedSubscription, err := pubsubServer.Subscribe(context.Background(), "testPostBatch", settlement.EventQueryNewSettlementBatchAccepted) + assert.NoError(t, err) + cases := []struct { - name string - isBatchSubmitSuccess bool - isBatchAccepted bool - expectedHealthStatus bool - expectedError error + name string + isBatchSubmitSuccess bool + isBatchAcceptedHubEvent bool + shouldMockBatchIncluded bool + isBatchIncludedSuccess bool + expectedBatchAcceptedEvent bool + expectedHealthEventValue bool + expectedError error }{ { - name: "TestSubmitBatchFailure", - isBatchSubmitSuccess: false, - isBatchAccepted: false, - expectedHealthStatus: false, - expectedError: submitBatchError, + name: "TestSubmitBatchFailure", + isBatchSubmitSuccess: false, + isBatchAcceptedHubEvent: false, + shouldMockBatchIncluded: true, + isBatchIncludedSuccess: false, + expectedHealthEventValue: false, + expectedBatchAcceptedEvent: false, + expectedError: submitBatchError, }, { - name: "TestSubmitBatchSuccessNotAccepted", - isBatchSubmitSuccess: true, - isBatchAccepted: false, - expectedHealthStatus: false, - expectedError: settlement.ErrBatchNotAccepted, + name: "TestSubmitBatchSuccessNoBatchAcceptedHubEventNotIncluded", + isBatchSubmitSuccess: true, + isBatchAcceptedHubEvent: false, + shouldMockBatchIncluded: true, + isBatchIncludedSuccess: false, + expectedHealthEventValue: false, + expectedBatchAcceptedEvent: false, + expectedError: settlement.ErrBatchNotAccepted, }, { - name: "TestSubmitBatchSuccessAndAccepted", - isBatchSubmitSuccess: true, - isBatchAccepted: true, - expectedHealthStatus: true, - expectedError: nil, + name: "TestSubmitBatchSuccessNotAcceptedYesIncluded", + isBatchSubmitSuccess: true, + isBatchAcceptedHubEvent: false, + shouldMockBatchIncluded: true, + isBatchIncludedSuccess: true, + expectedHealthEventValue: true, + expectedBatchAcceptedEvent: true, + }, + { + name: "TestSubmitBatchSuccessAndAccepted", + isBatchSubmitSuccess: true, + isBatchAcceptedHubEvent: true, + shouldMockBatchIncluded: false, + expectedHealthEventValue: true, + expectedError: nil, + expectedBatchAcceptedEvent: true, }, } for _, c := range cases { t.Run(c.name, func(t *testing.T) { - done := make(chan bool) + // Init the wait group and set the number of expected events + var wg sync.WaitGroup + var eventsCount int + if c.expectedBatchAcceptedEvent { + eventsCount = 2 + } else { + eventsCount = 1 + } + wg.Add(eventsCount) + // Reset the mock functions testutil.UnsetMockFn(cosmosClientMock.On("BroadcastTx")) + testutil.UnsetMockFn(rollappQueryClientMock.On("LatestStateIndex")) + testutil.UnsetMockFn(rollappQueryClientMock.On("StateInfo")) + // Set the mock logic based on the test case if !c.isBatchSubmitSuccess { cosmosClientMock.On("BroadcastTx", mock.Anything, mock.Anything).Return(cosmosclient.Response{TxResponse: &types.TxResponse{Code: 1}}, submitBatchError) } else { cosmosClientMock.On("BroadcastTx", mock.Anything, mock.Anything).Return(cosmosclient.Response{TxResponse: &types.TxResponse{Code: 0}}, nil) } + if c.shouldMockBatchIncluded { + if c.isBatchIncludedSuccess { + rollappQueryClientMock.On("LatestStateIndex", mock.Anything, mock.Anything).Return( + &rollapptypes.QueryGetLatestStateIndexResponse{StateIndex: rollapptypes.StateInfoIndex{Index: 1}}, nil) + daMetaData := &settlement.DAMetaData{ + Height: 1, + Client: da.Mock, + } + rollappQueryClientMock.On("StateInfo", mock.Anything, mock.Anything).Return( + &rollapptypes.QueryGetStateInfoResponse{StateInfo: rollapptypes.StateInfo{ + StartHeight: batch.StartHeight, StateInfoIndex: rollapptypes.StateInfoIndex{Index: 1}, DAPath: daMetaData.ToPath(), NumBlocks: 1}}, + nil) + } else { + rollappQueryClientMock.On("LatestStateIndex", mock.Anything, mock.Anything).Return(nil, nil) + } + } hubClient, err := newDymensionHubClient(settlement.Config{}, pubsubServer, test.NewLogger(t), options...) require.NoError(err) hubClient.Start() + // Handle the various events that are emitted and timeout if we don't get them + var eventsReceivedCount int64 + go func() { + select { + case healthEvent := <-HealthSubscription.Out(): + t.Logf("got health event: %v", healthEvent) + healthStatusEvent := healthEvent.Data().(settlement.EventDataSettlementHealthStatus) + assert.Equal(t, c.expectedHealthEventValue, healthStatusEvent.Healthy) + assert.Equal(t, c.expectedError, healthStatusEvent.Error) + wg.Done() + atomic.AddInt64(&eventsReceivedCount, 1) + return + } + }() + if c.expectedBatchAcceptedEvent { + go func() { + select { + case batchAcceptedEvent := <-BatchAcceptedSubscription.Out(): + t.Logf("got batch accepted event: %v", batchAcceptedEvent) + batchAcceptedEventData := batchAcceptedEvent.Data().(*settlement.EventDataNewSettlementBatchAccepted) + assert.Equal(t, batchAcceptedEventData.EndHeight, batch.EndHeight) + wg.Done() + atomic.AddInt64(&eventsReceivedCount, 1) + return + } + }() + } + go func() { + select { + case <-time.After(2 * time.Second): + eventsRecievedCount := int(atomic.LoadInt64(&eventsReceivedCount)) + if eventsRecievedCount < eventsCount { + t.Error("Didn't recieve all events expected") + for i := 0; i < eventsCount-eventsRecievedCount; i++ { + wg.Done() + } + } + return + } + }() + // Post the batch go hubClient.PostBatch(batch, da.Mock, &da.ResultSubmitBatch{}) // Wait for the batch to be submitted and submit an event notifying that the batch was accepted time.Sleep(50 * time.Millisecond) - if c.isBatchAccepted { + if c.isBatchAcceptedHubEvent { batchAcceptedCh <- coretypes.ResultEvent{ Query: fmt.Sprintf("state_update.rollapp_id='%s'", ""), Events: map[string][]string{ "state_update.num_blocks": {"1"}, - "state_update.start_height": {"0"}, + "state_update.start_height": {"1"}, "state_update.state_info_index": {"1"}, }, } } - go func() { - select { - case event := <-HealthSubscription.Out(): - healthStatusEvent := event.Data().(settlement.EventDataSettlementHealthStatus) - assert.Equal(t, c.expectedHealthStatus, healthStatusEvent.Healthy) - assert.Equal(t, c.expectedError, healthStatusEvent.Error) - done <- true - break - case <-time.After(1 * time.Second): - t.Error("expected health status event but didn't get one") - done <- true - break - } - }() - <-done + // Wait for all events to be handled + wg.Wait() // Stop the hub client and wait for it to stop hubClient.Stop() time.Sleep(1 * time.Second) diff --git a/settlement/settlement.go b/settlement/settlement.go index e5902395c..9d55cebdc 100644 --- a/settlement/settlement.go +++ b/settlement/settlement.go @@ -27,6 +27,7 @@ type BaseResult struct { Code StatusCode // Message may contain settlement layer specific information (like detailed error message, etc) Message string + //TODO(omritoptix): Move StateIndex to be part of the batch struct // StateIndex is the rollapp-specific index the batch was saved in the SL StateIndex uint64 }