From d2b2cdfb13ae4ae5124819909c0d35229cd73102 Mon Sep 17 00:00:00 2001 From: Michael Tsitrin <114929630+mtsitrin@users.noreply.github.com> Date: Mon, 27 Nov 2023 10:04:05 +0200 Subject: [PATCH] fix: checking DA received batches (#527) --- block/manager_test.go | 2 +- block/retriever.go | 49 ++++++++++++++++++++---------- da/celestia/celestia.go | 9 +++--- da/mock/mock.go | 2 ++ settlement/dymension/dymension.go | 50 +++++++++++++++++++++++-------- 5 files changed, 80 insertions(+), 32 deletions(-) diff --git a/block/manager_test.go b/block/manager_test.go index 83ff832ed..de94439db 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -33,7 +33,7 @@ import ( const ( connectionRefusedErrorMessage = "connection refused" - batchNotFoundErrorMessage = "batch not found" + batchNotFoundErrorMessage = "no batches found" ) func TestInitialState(t *testing.T) { diff --git a/block/retriever.go b/block/retriever.go index 91f848241..36823ffe2 100644 --- a/block/retriever.go +++ b/block/retriever.go @@ -21,7 +21,10 @@ func (m *Manager) RetriveLoop(ctx context.Context) { default: // Get only the latest sync target syncTarget := syncTargetpoller.Next() - m.syncUntilTarget(ctx, *(*uint64)(syncTarget)) + err := m.syncUntilTarget(ctx, *(*uint64)(syncTarget)) + if err != nil { + panic(err) + } // Check if after we sync we are synced or a new syncTarget was already set. // If we are synced then signal all goroutines waiting on isSyncedCond. if m.store.Height() >= atomic.LoadUint64(&m.syncTarget) { @@ -37,26 +40,36 @@ func (m *Manager) RetriveLoop(ctx context.Context) { // syncUntilTarget syncs the block until the syncTarget is reached. // It fetches the batches from the settlement, gets the DA height and gets // the actual blocks from the DA. -func (m *Manager) syncUntilTarget(ctx context.Context, syncTarget uint64) { +func (m *Manager) syncUntilTarget(ctx context.Context, syncTarget uint64) error { currentHeight := m.store.Height() for currentHeight < syncTarget { - m.logger.Info("Syncing until target", "current height", currentHeight, "syncTarget", syncTarget) - resultRetrieveBatch, err := m.settlementClient.RetrieveBatch(atomic.LoadUint64(&m.lastState.SLStateIndex) + 1) + currStateIdx := atomic.LoadUint64(&m.lastState.SLStateIndex) + 1 + m.logger.Info("Syncing until target", "height", currentHeight, "state_index", currStateIdx, "syncTarget", syncTarget) + settlementBatch, err := m.settlementClient.RetrieveBatch(currStateIdx) if err != nil { - m.logger.Error("Failed to sync until target. error while retrieving batch", "error", err) - continue + return err } - err = m.processNextDABatch(ctx, resultRetrieveBatch.MetaData.DA.Height) - if err != nil { - m.logger.Error("Failed to sync until target. error while processing next DA batch", "error", err) - break + + if settlementBatch.StartHeight != currentHeight+1 { + return fmt.Errorf("settlement batch start height (%d) on index (%d) is not the expected", settlementBatch.StartHeight, currStateIdx) } - err = m.updateStateIndex(resultRetrieveBatch.StateIndex) + + err = m.processNextDABatch(ctx, settlementBatch.MetaData.DA.Height) if err != nil { - return + return err } + currentHeight = m.store.Height() + if currentHeight != settlementBatch.EndHeight { + return fmt.Errorf("after applying state index (%d), the height (%d) is not as expected (%d)", currStateIdx, currentHeight, settlementBatch.EndHeight) + } + + err = m.updateStateIndex(settlementBatch.StateIndex) + if err != nil { + return err + } } + return nil } func (m *Manager) updateStateIndex(stateIndex uint64) error { @@ -73,10 +86,11 @@ func (m *Manager) processNextDABatch(ctx context.Context, daHeight uint64) error m.logger.Debug("trying to retrieve batch from DA", "daHeight", daHeight) batchResp, err := m.fetchBatch(daHeight) if err != nil { - m.logger.Error("failed to retrieve batch from DA", "daHeight", daHeight, "error", err) return err } + m.logger.Debug("retrieved batches", "n", len(batchResp.Batches), "daHeight", daHeight) + for _, batch := range batchResp.Batches { for i, block := range batch.Blocks { err := m.applyBlock(ctx, block, batch.Commits[i], blockMetaData{source: daBlock, daHeight: daHeight}) @@ -93,9 +107,14 @@ func (m *Manager) fetchBatch(daHeight uint64) (da.ResultRetrieveBatch, error) { batchRes := m.retriever.RetrieveBatches(daHeight) switch batchRes.Code { case da.StatusError: - err = fmt.Errorf("failed to retrieve batch: %s", batchRes.Message) + err = fmt.Errorf("failed to retrieve batch from height %d: %s", daHeight, batchRes.Message) case da.StatusTimeout: - err = fmt.Errorf("timeout during retrieve batch: %s", batchRes.Message) + err = fmt.Errorf("timeout during retrieve batch from height %d: %s", daHeight, batchRes.Message) + } + + if len(batchRes.Batches) == 0 { + err = fmt.Errorf("no batches found on height %d", daHeight) } + return batchRes, err } diff --git a/da/celestia/celestia.go b/da/celestia/celestia.go index 8793d8ba4..45a5adf63 100644 --- a/da/celestia/celestia.go +++ b/da/celestia/celestia.go @@ -264,16 +264,16 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(dataLayerHeight uint64) da } } - batches := make([]*types.Batch, len(data)) + var batches []*types.Batch for i, msg := range data { var batch pb.Batch err = proto.Unmarshal(msg, &batch) if err != nil { - c.logger.Error("failed to unmarshal batch", "daHeight", dataLayerHeight, "position", i, "error", err) + c.logger.Error("failed to unmarshal block", "daHeight", dataLayerHeight, "position", i, "error", err) continue } - batches[i] = new(types.Batch) - err := batches[i].FromProto(&batch) + parsedBatch := new(types.Batch) + err := parsedBatch.FromProto(&batch) if err != nil { return da.ResultRetrieveBatch{ BaseResult: da.BaseResult{ @@ -282,6 +282,7 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(dataLayerHeight uint64) da }, } } + batches = append(batches, parsedBatch) } return da.ResultRetrieveBatch{ diff --git a/da/mock/mock.go b/da/mock/mock.go index 8ff07e366..5aed6e993 100644 --- a/da/mock/mock.go +++ b/da/mock/mock.go @@ -93,6 +93,8 @@ func (m *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS return da.ResultSubmitBatch{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}} } + atomic.StoreUint64(&m.daHeight, daHeight+1) + return da.ResultSubmitBatch{ BaseResult: da.BaseResult{ Code: da.StatusSuccess, diff --git a/settlement/dymension/dymension.go b/settlement/dymension/dymension.go index 4b618510a..cfdd6cb69 100644 --- a/settlement/dymension/dymension.go +++ b/settlement/dymension/dymension.go @@ -292,14 +292,22 @@ func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult * // GetLatestBatch returns the latest batch from the Dymension Hub. func (d *HubClient) GetLatestBatch(rollappID string) (*settlement.ResultRetrieveBatch, error) { - latestStateInfoIndexResp, err := d.rollappQueryClient.LatestStateIndex(d.ctx, - &rollapptypes.QueryGetLatestStateIndexRequest{RollappId: d.config.RollappID}) - if latestStateInfoIndexResp == nil { - return nil, settlement.ErrBatchNotFound - } + var latestStateInfoIndexResp *rollapptypes.QueryGetLatestStateIndexResponse + err := retry.Do(func() error { + var err error + latestStateInfoIndexResp, err = d.rollappQueryClient.LatestStateIndex(d.ctx, + &rollapptypes.QueryGetLatestStateIndexRequest{RollappId: d.config.RollappID}) + return err + }, retry.Context(d.ctx), retry.LastErrorOnly(true), + retry.Delay(d.batchRetryDelay), retry.Attempts(d.batchRetryAttempts), retry.MaxDelay(batchRetryMaxDelay)) + if err != nil { return nil, err } + if latestStateInfoIndexResp == nil { + return nil, settlement.ErrBatchNotFound + } + latestBatch, err := d.GetBatchAtIndex(rollappID, latestStateInfoIndexResp.StateIndex.Index) if err != nil { return nil, err @@ -309,23 +317,41 @@ func (d *HubClient) GetLatestBatch(rollappID string) (*settlement.ResultRetrieve // GetBatchAtIndex returns the batch at the given index from the Dymension Hub. func (d *HubClient) GetBatchAtIndex(rollappID string, index uint64) (*settlement.ResultRetrieveBatch, error) { - stateInfoResp, err := d.rollappQueryClient.StateInfo(d.ctx, - &rollapptypes.QueryGetStateInfoRequest{RollappId: d.config.RollappID, Index: index}) - if stateInfoResp == nil { - return nil, settlement.ErrBatchNotFound - } + var stateInfoResp *rollapptypes.QueryGetStateInfoResponse + err := retry.Do(func() error { + var err error + stateInfoResp, err = d.rollappQueryClient.StateInfo(d.ctx, + &rollapptypes.QueryGetStateInfoRequest{RollappId: d.config.RollappID, Index: index}) + return err + }, retry.Context(d.ctx), retry.LastErrorOnly(true), + retry.Delay(d.batchRetryDelay), retry.Attempts(d.batchRetryAttempts), retry.MaxDelay(batchRetryMaxDelay)) + if err != nil { return nil, err } + if stateInfoResp == nil { + return nil, settlement.ErrBatchNotFound + } return d.convertStateInfoToResultRetrieveBatch(&stateInfoResp.StateInfo) } // GetSequencers returns the sequence of the given rollapp. func (d *HubClient) GetSequencers(rollappID string) ([]*types.Sequencer, error) { - sequencers, err := d.sequencerQueryClient.SequencersByRollapp(d.ctx, &sequencertypes.QueryGetSequencersByRollappRequest{RollappId: d.config.RollappID}) + var sequencers *sequencertypes.QueryGetSequencersByRollappResponse + err := retry.Do(func() error { + var err error + sequencers, err = d.sequencerQueryClient.SequencersByRollapp(d.ctx, &sequencertypes.QueryGetSequencersByRollappRequest{RollappId: d.config.RollappID}) + if err != nil { + return errors.Wrapf(settlement.ErrNoSequencerForRollapp, "rollappID: %s", rollappID) + } + return nil + }, retry.Context(d.ctx), retry.LastErrorOnly(true), + retry.Delay(d.batchRetryDelay), retry.Attempts(d.batchRetryAttempts), retry.MaxDelay(batchRetryMaxDelay)) + if err != nil { - return nil, errors.Wrapf(settlement.ErrNoSequencerForRollapp, "rollappID: %s", rollappID) + return nil, err } + sequencersList := []*types.Sequencer{} for _, sequencer := range sequencers.SequencerInfoList { var pubKey cryptotypes.PubKey