Skip to content

Commit

Permalink
fix: checking DA received batches (#527)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtsitrin authored Nov 27, 2023
1 parent aa655a4 commit d2b2cdf
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 32 deletions.
2 changes: 1 addition & 1 deletion block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

const (
connectionRefusedErrorMessage = "connection refused"
batchNotFoundErrorMessage = "batch not found"
batchNotFoundErrorMessage = "no batches found"
)

func TestInitialState(t *testing.T) {
Expand Down
49 changes: 34 additions & 15 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ func (m *Manager) RetriveLoop(ctx context.Context) {
default:
// Get only the latest sync target
syncTarget := syncTargetpoller.Next()
m.syncUntilTarget(ctx, *(*uint64)(syncTarget))
err := m.syncUntilTarget(ctx, *(*uint64)(syncTarget))
if err != nil {
panic(err)
}
// Check if after we sync we are synced or a new syncTarget was already set.
// If we are synced then signal all goroutines waiting on isSyncedCond.
if m.store.Height() >= atomic.LoadUint64(&m.syncTarget) {
Expand All @@ -37,26 +40,36 @@ func (m *Manager) RetriveLoop(ctx context.Context) {
// syncUntilTarget syncs the block until the syncTarget is reached.
// It fetches the batches from the settlement, gets the DA height and gets
// the actual blocks from the DA.
func (m *Manager) syncUntilTarget(ctx context.Context, syncTarget uint64) {
func (m *Manager) syncUntilTarget(ctx context.Context, syncTarget uint64) error {
currentHeight := m.store.Height()
for currentHeight < syncTarget {
m.logger.Info("Syncing until target", "current height", currentHeight, "syncTarget", syncTarget)
resultRetrieveBatch, err := m.settlementClient.RetrieveBatch(atomic.LoadUint64(&m.lastState.SLStateIndex) + 1)
currStateIdx := atomic.LoadUint64(&m.lastState.SLStateIndex) + 1
m.logger.Info("Syncing until target", "height", currentHeight, "state_index", currStateIdx, "syncTarget", syncTarget)
settlementBatch, err := m.settlementClient.RetrieveBatch(currStateIdx)
if err != nil {
m.logger.Error("Failed to sync until target. error while retrieving batch", "error", err)
continue
return err
}
err = m.processNextDABatch(ctx, resultRetrieveBatch.MetaData.DA.Height)
if err != nil {
m.logger.Error("Failed to sync until target. error while processing next DA batch", "error", err)
break

if settlementBatch.StartHeight != currentHeight+1 {
return fmt.Errorf("settlement batch start height (%d) on index (%d) is not the expected", settlementBatch.StartHeight, currStateIdx)
}
err = m.updateStateIndex(resultRetrieveBatch.StateIndex)

err = m.processNextDABatch(ctx, settlementBatch.MetaData.DA.Height)
if err != nil {
return
return err
}

currentHeight = m.store.Height()
if currentHeight != settlementBatch.EndHeight {
return fmt.Errorf("after applying state index (%d), the height (%d) is not as expected (%d)", currStateIdx, currentHeight, settlementBatch.EndHeight)
}

err = m.updateStateIndex(settlementBatch.StateIndex)
if err != nil {
return err
}
}
return nil
}

func (m *Manager) updateStateIndex(stateIndex uint64) error {
Expand All @@ -73,10 +86,11 @@ func (m *Manager) processNextDABatch(ctx context.Context, daHeight uint64) error
m.logger.Debug("trying to retrieve batch from DA", "daHeight", daHeight)
batchResp, err := m.fetchBatch(daHeight)
if err != nil {
m.logger.Error("failed to retrieve batch from DA", "daHeight", daHeight, "error", err)
return err
}

m.logger.Debug("retrieved batches", "n", len(batchResp.Batches), "daHeight", daHeight)

for _, batch := range batchResp.Batches {
for i, block := range batch.Blocks {
err := m.applyBlock(ctx, block, batch.Commits[i], blockMetaData{source: daBlock, daHeight: daHeight})
Expand All @@ -93,9 +107,14 @@ func (m *Manager) fetchBatch(daHeight uint64) (da.ResultRetrieveBatch, error) {
batchRes := m.retriever.RetrieveBatches(daHeight)
switch batchRes.Code {
case da.StatusError:
err = fmt.Errorf("failed to retrieve batch: %s", batchRes.Message)
err = fmt.Errorf("failed to retrieve batch from height %d: %s", daHeight, batchRes.Message)
case da.StatusTimeout:
err = fmt.Errorf("timeout during retrieve batch: %s", batchRes.Message)
err = fmt.Errorf("timeout during retrieve batch from height %d: %s", daHeight, batchRes.Message)
}

if len(batchRes.Batches) == 0 {
err = fmt.Errorf("no batches found on height %d", daHeight)
}

return batchRes, err
}
9 changes: 5 additions & 4 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,16 +264,16 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(dataLayerHeight uint64) da
}
}

batches := make([]*types.Batch, len(data))
var batches []*types.Batch
for i, msg := range data {
var batch pb.Batch
err = proto.Unmarshal(msg, &batch)
if err != nil {
c.logger.Error("failed to unmarshal batch", "daHeight", dataLayerHeight, "position", i, "error", err)
c.logger.Error("failed to unmarshal block", "daHeight", dataLayerHeight, "position", i, "error", err)
continue
}
batches[i] = new(types.Batch)
err := batches[i].FromProto(&batch)
parsedBatch := new(types.Batch)
err := parsedBatch.FromProto(&batch)
if err != nil {
return da.ResultRetrieveBatch{
BaseResult: da.BaseResult{
Expand All @@ -282,6 +282,7 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(dataLayerHeight uint64) da
},
}
}
batches = append(batches, parsedBatch)
}

return da.ResultRetrieveBatch{
Expand Down
2 changes: 2 additions & 0 deletions da/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ func (m *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS
return da.ResultSubmitBatch{BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error()}}
}

atomic.StoreUint64(&m.daHeight, daHeight+1)

return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
Expand Down
50 changes: 38 additions & 12 deletions settlement/dymension/dymension.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,22 @@ func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult *

// GetLatestBatch returns the latest batch from the Dymension Hub.
func (d *HubClient) GetLatestBatch(rollappID string) (*settlement.ResultRetrieveBatch, error) {
latestStateInfoIndexResp, err := d.rollappQueryClient.LatestStateIndex(d.ctx,
&rollapptypes.QueryGetLatestStateIndexRequest{RollappId: d.config.RollappID})
if latestStateInfoIndexResp == nil {
return nil, settlement.ErrBatchNotFound
}
var latestStateInfoIndexResp *rollapptypes.QueryGetLatestStateIndexResponse
err := retry.Do(func() error {
var err error
latestStateInfoIndexResp, err = d.rollappQueryClient.LatestStateIndex(d.ctx,
&rollapptypes.QueryGetLatestStateIndexRequest{RollappId: d.config.RollappID})
return err
}, retry.Context(d.ctx), retry.LastErrorOnly(true),
retry.Delay(d.batchRetryDelay), retry.Attempts(d.batchRetryAttempts), retry.MaxDelay(batchRetryMaxDelay))

if err != nil {
return nil, err
}
if latestStateInfoIndexResp == nil {
return nil, settlement.ErrBatchNotFound
}

latestBatch, err := d.GetBatchAtIndex(rollappID, latestStateInfoIndexResp.StateIndex.Index)
if err != nil {
return nil, err
Expand All @@ -309,23 +317,41 @@ func (d *HubClient) GetLatestBatch(rollappID string) (*settlement.ResultRetrieve

// GetBatchAtIndex returns the batch at the given index from the Dymension Hub.
func (d *HubClient) GetBatchAtIndex(rollappID string, index uint64) (*settlement.ResultRetrieveBatch, error) {
stateInfoResp, err := d.rollappQueryClient.StateInfo(d.ctx,
&rollapptypes.QueryGetStateInfoRequest{RollappId: d.config.RollappID, Index: index})
if stateInfoResp == nil {
return nil, settlement.ErrBatchNotFound
}
var stateInfoResp *rollapptypes.QueryGetStateInfoResponse
err := retry.Do(func() error {
var err error
stateInfoResp, err = d.rollappQueryClient.StateInfo(d.ctx,
&rollapptypes.QueryGetStateInfoRequest{RollappId: d.config.RollappID, Index: index})
return err
}, retry.Context(d.ctx), retry.LastErrorOnly(true),
retry.Delay(d.batchRetryDelay), retry.Attempts(d.batchRetryAttempts), retry.MaxDelay(batchRetryMaxDelay))

if err != nil {
return nil, err
}
if stateInfoResp == nil {
return nil, settlement.ErrBatchNotFound
}
return d.convertStateInfoToResultRetrieveBatch(&stateInfoResp.StateInfo)
}

// GetSequencers returns the sequence of the given rollapp.
func (d *HubClient) GetSequencers(rollappID string) ([]*types.Sequencer, error) {
sequencers, err := d.sequencerQueryClient.SequencersByRollapp(d.ctx, &sequencertypes.QueryGetSequencersByRollappRequest{RollappId: d.config.RollappID})
var sequencers *sequencertypes.QueryGetSequencersByRollappResponse
err := retry.Do(func() error {
var err error
sequencers, err = d.sequencerQueryClient.SequencersByRollapp(d.ctx, &sequencertypes.QueryGetSequencersByRollappRequest{RollappId: d.config.RollappID})
if err != nil {
return errors.Wrapf(settlement.ErrNoSequencerForRollapp, "rollappID: %s", rollappID)
}
return nil
}, retry.Context(d.ctx), retry.LastErrorOnly(true),
retry.Delay(d.batchRetryDelay), retry.Attempts(d.batchRetryAttempts), retry.MaxDelay(batchRetryMaxDelay))

if err != nil {
return nil, errors.Wrapf(settlement.ErrNoSequencerForRollapp, "rollappID: %s", rollappID)
return nil, err
}

sequencersList := []*types.Sequencer{}
for _, sequencer := range sequencers.SequencerInfoList {
var pubKey cryptotypes.PubKey
Expand Down

0 comments on commit d2b2cdf

Please sign in to comment.