Skip to content

Commit

Permalink
fix: dynamic subscriber name to avoid possible subscriber collision (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
omritoptix committed Aug 13, 2023
1 parent c9d6e50 commit 3c2f35c
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 22 deletions.
16 changes: 12 additions & 4 deletions settlement/dymension/dymension.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import (
"strconv"
"time"

"cosmossdk.io/errors"
"github.com/avast/retry-go/v4"
"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/codec"
cdctypes "github.com/cosmos/cosmos-sdk/codec/types"
cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec"
"github.com/dymensionxyz/cosmosclient/cosmosclient"
rollapptypes "github.com/dymensionxyz/dymension/x/rollapp/types"
"github.com/google/uuid"
"github.com/ignite/cli/ignite/pkg/cosmosaccount"
"github.com/pkg/errors"

cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
sequencertypes "github.com/dymensionxyz/dymension/x/sequencer/types"
Expand Down Expand Up @@ -46,6 +47,10 @@ const (
batchRetryAttempts = 10
)

const (
postBatchSubscriberPrefix = "postBatchSubscriber"
)

// LayerClient is intended only for usage in tests.
type LayerClient struct {
*settlement.BaseLayerClient
Expand Down Expand Up @@ -197,18 +202,21 @@ func (d *HubClient) Stop() error {
// PostBatch posts a batch to the Dymension Hub. it tries to post the batch until it is accepted by the settlement layer.
// it emits success and failure events to the event bus accordingly.
func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult *da.ResultSubmitBatch) {

msgUpdateState, err := d.convertBatchToMsgUpdateState(batch, daClient, daResult)
if err != nil {
panic(err)
}

subscription, err := d.pubsub.Subscribe(d.ctx, "SLBatchPost", settlement.EventQueryNewSettlementBatchAccepted)
postBatchSubscriberClient := fmt.Sprintf("%s-%d-%s", postBatchSubscriberPrefix, batch.StartHeight, uuid.New().String())
subscription, err := d.pubsub.Subscribe(d.ctx, postBatchSubscriberClient, settlement.EventQueryNewSettlementBatchAccepted)
if err != nil {
d.logger.Error("failed to subscribe to state update events", "error", err)
d.logger.Error("failed to subscribe to state update events", "err", err)
panic(err)
}

//nolint:errcheck
defer d.pubsub.Unsubscribe(d.ctx, "SLBatchPost", settlement.EventQueryNewSettlementBatchAccepted)
defer d.pubsub.Unsubscribe(d.ctx, postBatchSubscriberClient, settlement.EventQueryNewSettlementBatchAccepted)

// Try submitting the batch to the settlement layer. If submission (i.e only submission, not acceptance) fails we emit an unhealthy event
// and try again in the next loop. If submission succeeds we wait for the batch to be accepted by the settlement layer.
Expand Down
26 changes: 8 additions & 18 deletions settlement/dymension/dymension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,11 @@ func TestPostBatch(t *testing.T) {
healthStatusEvent := healthEvent.Data().(*settlement.EventDataSettlementHealthStatus)
assert.Equal(t, c.expectedHealthEventValue, healthStatusEvent.Healthy)
assert.Equal(t, c.expectedError, healthStatusEvent.Error)
wg.Done()
atomic.AddInt64(&eventsReceivedCount, 1)
return
case <-time.After(10 * time.Second):
t.Error("Didn't recieve health event")
}
wg.Done()
}()
if c.expectedBatchAcceptedEvent {
go func() {
Expand All @@ -220,25 +221,14 @@ func TestPostBatch(t *testing.T) {
t.Logf("got batch accepted event: %v", batchAcceptedEvent)
batchAcceptedEventData := batchAcceptedEvent.Data().(*settlement.EventDataNewSettlementBatchAccepted)
assert.Equal(t, batchAcceptedEventData.EndHeight, batch.EndHeight)
wg.Done()
atomic.AddInt64(&eventsReceivedCount, 1)
return
case <-time.After(10 * time.Second):
t.Error("Didn't recieve batch accepted event")
}
wg.Done()

}()
}
go func() {
select {
case <-time.After(2 * time.Second):
eventsRecievedCount := int(atomic.LoadInt64(&eventsReceivedCount))
if eventsRecievedCount < eventsCount {
t.Error("Didn't recieve all events expected")
for i := 0; i < eventsCount-eventsRecievedCount; i++ {
wg.Done()
}
}
return
}
}()
// Post the batch
go hubClient.PostBatch(batch, da.Mock, &da.ResultSubmitBatch{})
// Wait for the batch to be submitted and submit an event notifying that the batch was accepted
Expand All @@ -253,8 +243,8 @@ func TestPostBatch(t *testing.T) {
},
}
}
// Wait for all events to be handled
wg.Wait()
assert.Equal(t, eventsCount, int(eventsReceivedCount))
// Stop the hub client and wait for it to stop
hubClient.Stop()
time.Sleep(1 * time.Second)
Expand Down

0 comments on commit 3c2f35c

Please sign in to comment.