diff --git a/settlement/dymension/dymension.go b/settlement/dymension/dymension.go index a71185604..7d47c27e9 100644 --- a/settlement/dymension/dymension.go +++ b/settlement/dymension/dymension.go @@ -6,7 +6,6 @@ import ( "strconv" "time" - "cosmossdk.io/errors" "github.com/avast/retry-go/v4" "github.com/cosmos/cosmos-sdk/client/flags" "github.com/cosmos/cosmos-sdk/codec" @@ -14,7 +13,9 @@ import ( cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec" "github.com/dymensionxyz/cosmosclient/cosmosclient" rollapptypes "github.com/dymensionxyz/dymension/x/rollapp/types" + "github.com/google/uuid" "github.com/ignite/cli/ignite/pkg/cosmosaccount" + "github.com/pkg/errors" cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" sequencertypes "github.com/dymensionxyz/dymension/x/sequencer/types" @@ -46,6 +47,10 @@ const ( batchRetryAttempts = 10 ) +const ( + postBatchSubscriberPrefix = "postBatchSubscriber" +) + // LayerClient is intended only for usage in tests. type LayerClient struct { *settlement.BaseLayerClient @@ -197,18 +202,21 @@ func (d *HubClient) Stop() error { // PostBatch posts a batch to the Dymension Hub. it tries to post the batch until it is accepted by the settlement layer. // it emits success and failure events to the event bus accordingly. func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) { + msgUpdateState, err := d.convertBatchToMsgUpdateState(batch, daClient, daResult) if err != nil { panic(err) } - subscription, err := d.pubsub.Subscribe(d.ctx, "SLBatchPost", settlement.EventQueryNewSettlementBatchAccepted) + postBatchSubscriberClient := fmt.Sprintf("%s-%d-%s", postBatchSubscriberPrefix, batch.StartHeight, uuid.New().String()) + subscription, err := d.pubsub.Subscribe(d.ctx, postBatchSubscriberClient, settlement.EventQueryNewSettlementBatchAccepted) if err != nil { - d.logger.Error("failed to subscribe to state update events", "error", err) + d.logger.Error("failed to subscribe to state update events", "err", err) panic(err) } + //nolint:errcheck - defer d.pubsub.Unsubscribe(d.ctx, "SLBatchPost", settlement.EventQueryNewSettlementBatchAccepted) + defer d.pubsub.Unsubscribe(d.ctx, postBatchSubscriberClient, settlement.EventQueryNewSettlementBatchAccepted) // Try submitting the batch to the settlement layer. If submission (i.e only submission, not acceptance) fails we emit an unhealthy event // and try again in the next loop. If submission succeeds we wait for the batch to be accepted by the settlement layer. diff --git a/settlement/dymension/dymension_test.go b/settlement/dymension/dymension_test.go index 3954c1141..022369eec 100644 --- a/settlement/dymension/dymension_test.go +++ b/settlement/dymension/dymension_test.go @@ -208,10 +208,11 @@ func TestPostBatch(t *testing.T) { healthStatusEvent := healthEvent.Data().(*settlement.EventDataSettlementHealthStatus) assert.Equal(t, c.expectedHealthEventValue, healthStatusEvent.Healthy) assert.Equal(t, c.expectedError, healthStatusEvent.Error) - wg.Done() atomic.AddInt64(&eventsReceivedCount, 1) - return + case <-time.After(10 * time.Second): + t.Error("Didn't recieve health event") } + wg.Done() }() if c.expectedBatchAcceptedEvent { go func() { @@ -220,25 +221,14 @@ func TestPostBatch(t *testing.T) { t.Logf("got batch accepted event: %v", batchAcceptedEvent) batchAcceptedEventData := batchAcceptedEvent.Data().(*settlement.EventDataNewSettlementBatchAccepted) assert.Equal(t, batchAcceptedEventData.EndHeight, batch.EndHeight) - wg.Done() atomic.AddInt64(&eventsReceivedCount, 1) - return + case <-time.After(10 * time.Second): + t.Error("Didn't recieve batch accepted event") } + wg.Done() + }() } - go func() { - select { - case <-time.After(2 * time.Second): - eventsRecievedCount := int(atomic.LoadInt64(&eventsReceivedCount)) - if eventsRecievedCount < eventsCount { - t.Error("Didn't recieve all events expected") - for i := 0; i < eventsCount-eventsRecievedCount; i++ { - wg.Done() - } - } - return - } - }() // Post the batch go hubClient.PostBatch(batch, da.Mock, &da.ResultSubmitBatch{}) // Wait for the batch to be submitted and submit an event notifying that the batch was accepted @@ -253,8 +243,8 @@ func TestPostBatch(t *testing.T) { }, } } - // Wait for all events to be handled wg.Wait() + assert.Equal(t, eventsCount, int(eventsReceivedCount)) // Stop the hub client and wait for it to stop hubClient.Stop() time.Sleep(1 * time.Second)