diff --git a/settlement/base.go b/settlement/base.go index e623dc179..e7673519d 100644 --- a/settlement/base.go +++ b/settlement/base.go @@ -76,7 +76,7 @@ func (b *BaseLayerClient) Start() error { endHeight = latestBatch.EndHeight } b.latestHeight = endHeight - b.logger.Info("Updated latest height from settlement layer", "latestHeight", b.latestHeight) + b.logger.Info("Updated latest height from settlement layer", "latestHeight", endHeight) b.sequencersList, err = b.fetchSequencersList() if err != nil { if err == ErrNoSequencerForRollapp { @@ -85,6 +85,7 @@ func (b *BaseLayerClient) Start() error { return err } b.logger.Info("Updated sequencers list from settlement layer", "sequencersList", b.sequencersList) + go b.stateUpdatesHandler() err = b.client.Start() if err != nil { @@ -123,7 +124,6 @@ func (b *BaseLayerClient) SubmitBatch(batch *types.Batch, daClient da.Client, da } } b.logger.Info("Successfully submitted batch to settlement layer", "tx hash", txResp.GetTxHash()) - atomic.StoreUint64(&b.latestHeight, batch.EndHeight) return &ResultSubmitBatch{ BaseResult: BaseResult{Code: StatusSuccess}, } @@ -206,3 +206,22 @@ func (b *BaseLayerClient) validateBatch(batch *types.Batch) error { } return nil } + +func (b *BaseLayerClient) stateUpdatesHandler() { + b.logger.Info("Started state updates handler loop") + subscription, err := b.pubsub.Subscribe(context.Background(), "stateUpdatesHandler", EventQueryNewSettlementBatchAccepted) + if err != nil { + b.logger.Error("failed to subscribe to state update events") + panic(err) + } + for { + select { + case event := <-subscription.Out(): + b.logger.Debug("Received state update event", "eventData", event.Data()) + eventData := event.Data().(*EventDataNewSettlementBatchAccepted) + atomic.StoreUint64(&b.latestHeight, eventData.EndHeight) + case <-subscription.Cancelled(): + b.logger.Info("Subscription canceled") + } + } +} diff --git a/settlement/mock/mock.go b/settlement/mock/mock.go index e26e554d6..be05db07b 100644 --- a/settlement/mock/mock.go +++ b/settlement/mock/mock.go @@ -8,6 +8,7 @@ import ( "errors" "strings" "sync/atomic" + "time" tmp2p "github.com/tendermint/tendermint/p2p" @@ -186,10 +187,14 @@ func (c *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult * if err != nil { return PostBatchResp{err}, err } - err = c.pubsub.PublishWithEvents(context.Background(), &settlement.EventDataNewSettlementBatchAccepted{EndHeight: settlementBatch.EndHeight}, map[string][]string{settlement.EventTypeKey: {settlement.EventNewSettlementBatchAccepted}}) - if err != nil { - c.logger.Error("error publishing event", "error", err) - } + go func() { + // sleep for 100 miliseconds to mimic a delay in batch acceptance + time.Sleep(100 * time.Millisecond) + err = c.pubsub.PublishWithEvents(context.Background(), &settlement.EventDataNewSettlementBatchAccepted{EndHeight: settlementBatch.EndHeight}, map[string][]string{settlement.EventTypeKey: {settlement.EventNewSettlementBatchAccepted}}) + if err != nil { + c.logger.Error("error publishing event", "error", err) + } + }() return PostBatchResp{nil}, nil } diff --git a/settlement/settlement_test.go b/settlement/settlement_test.go index 8511ea332..855bcd77c 100644 --- a/settlement/settlement_test.go +++ b/settlement/settlement_test.go @@ -2,6 +2,7 @@ package settlement_test import ( "testing" + "time" "github.com/libp2p/go-libp2p-core/crypto" "github.com/stretchr/testify/assert" @@ -110,6 +111,8 @@ func TestSubmitAndRetrieve(t *testing.T) { } resultSubmitBatch := settlementClient.SubmitBatch(batch, da.Mock, daResult) assert.Equal(resultSubmitBatch.Code, settlement.StatusSuccess) + // sleep for 500 ms to make sure batch got accepted by the settlement layer + time.Sleep(500 * time.Millisecond) } // Retrieve the latest batch and make sure it matches latest batch submitted