Skip to content

Commit

Permalink
fix(settlement): fixed submission bug where multiple events would can…
Browse files Browse the repository at this point in the history
…cel subscription (#842)

Co-authored-by: Michael Tsitrin <[email protected]>
Co-authored-by: Michael Tsitrin <[email protected]>
Co-authored-by: danwt <[email protected]>
  • Loading branch information
4 people authored May 16, 2024
1 parent 4474f6e commit f7b9383
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 121 deletions.
23 changes: 12 additions & 11 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,18 @@ func DefaultConfig(home, chainId string) *NodeConfig {
}

defaultSLconfig := settlement.Config{
KeyringBackend: "test",
NodeAddress: "http://127.0.0.1:36657",
RollappID: chainId,
KeyringHomeDir: keyringDir,
DymAccountName: "sequencer",
GasPrices: "1000000000adym",
SLGrpc: defaultSlGrpcConfig,
RetryMinDelay: 1 * time.Second,
RetryMaxDelay: 10 * time.Second,
BatchAcceptanceTimeout: 120 * time.Second,
RetryAttempts: 10,
KeyringBackend: "test",
NodeAddress: "http://127.0.0.1:36657",
RollappID: chainId,
KeyringHomeDir: keyringDir,
DymAccountName: "sequencer",
GasPrices: "1000000000adym",
SLGrpc: defaultSlGrpcConfig,
RetryMinDelay: 1 * time.Second,
RetryMaxDelay: 10 * time.Second,
BatchAcceptanceTimeout: 120 * time.Second,
BatchAcceptanceAttempts: 5,
RetryAttempts: 10,
}
cfg.SettlementConfig = defaultSLconfig

Expand Down
1 change: 1 addition & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ retry_max_delay = "{{ .SettlementConfig.RetryMaxDelay }}"
retry_min_delay = "{{ .SettlementConfig.RetryMinDelay }}"
retry_attempts = "{{ .SettlementConfig.RetryAttempts }}"
batch_acceptance_timeout = "{{ .SettlementConfig.BatchAcceptanceTimeout }}"
batch_acceptance_attempts = "{{ .SettlementConfig.BatchAcceptanceAttempts }}"
#keyring and key name to be used for sequencer
keyring_backend = "{{ .SettlementConfig.KeyringBackend }}"
Expand Down
4 changes: 2 additions & 2 deletions mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error {
// The caller must hold txmp.mtx excluxively.
func (txmp *TxMempool) removeTxByKey(key types.TxKey) error {
if elt, ok := txmp.txByKey[key]; ok {
w := elt.Value.(*WrappedTx)
w, _ := elt.Value.(*WrappedTx)
delete(txmp.txByKey, key)
delete(txmp.txBySender, w.sender)
txmp.txs.Remove(elt)
Expand All @@ -272,7 +272,7 @@ func (txmp *TxMempool) removeTxByKey(key types.TxKey) error {
// removeTxByElement removes the specified transaction element from the mempool.
// The caller must hold txmp.mtx exclusively.
func (txmp *TxMempool) removeTxByElement(elt *clist.CElement) {
w := elt.Value.(*WrappedTx)
w, _ := elt.Value.(*WrappedTx)
delete(txmp.txByKey, w.tx.Key())
delete(txmp.txBySender, w.sender)
txmp.txs.Remove(elt)
Expand Down
25 changes: 13 additions & 12 deletions settlement/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@ import (

// Config for the DymensionLayerClient
type Config struct {
KeyringBackend string `mapstructure:"keyring_backend"`
NodeAddress string `mapstructure:"settlement_node_address"`
KeyringHomeDir string `mapstructure:"keyring_home_dir"`
DymAccountName string `mapstructure:"dym_account_name"`
RollappID string `mapstructure:"rollapp_id"`
GasLimit uint64 `mapstructure:"settlement_gas_limit"`
GasPrices string `mapstructure:"settlement_gas_prices"`
GasFees string `mapstructure:"settlement_gas_fees"`
RetryAttempts uint `mapstructure:"retry_attempts"`
RetryMaxDelay time.Duration `mapstructure:"retry_max_delay"`
RetryMinDelay time.Duration `mapstructure:"retry_min_delay"`
BatchAcceptanceTimeout time.Duration `mapstructure:"batch_acceptance_timeout"`
KeyringBackend string `mapstructure:"keyring_backend"`
NodeAddress string `mapstructure:"settlement_node_address"`
KeyringHomeDir string `mapstructure:"keyring_home_dir"`
DymAccountName string `mapstructure:"dym_account_name"`
RollappID string `mapstructure:"rollapp_id"`
GasLimit uint64 `mapstructure:"settlement_gas_limit"`
GasPrices string `mapstructure:"settlement_gas_prices"`
GasFees string `mapstructure:"settlement_gas_fees"`
RetryAttempts uint `mapstructure:"retry_attempts"`
RetryMaxDelay time.Duration `mapstructure:"retry_max_delay"`
RetryMinDelay time.Duration `mapstructure:"retry_min_delay"`
BatchAcceptanceTimeout time.Duration `mapstructure:"batch_acceptance_timeout"`
BatchAcceptanceAttempts uint `mapstructure:"batch_acceptance_attempts"`
// For testing only. probably should be refactored
ProposerPubKey string `json:"proposer_pub_key"`
// Config used for sl shared grpc mock
Expand Down
209 changes: 113 additions & 96 deletions settlement/dymension/dymension.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,11 @@ type HubClient struct {
// channel for getting notified when a batch is accepted by the settlement layer.
// only one batch of a specific height can get accepted and we can are currently sending only one batch at a time.
// for that reason it's safe to assume that if a batch is accepted, it refers to the last batch we've sent.
retryAttempts uint
retryMinDelay time.Duration
retryMaxDelay time.Duration
batchAcceptanceTimeout time.Duration
retryAttempts uint
retryMinDelay time.Duration
retryMaxDelay time.Duration
batchAcceptanceTimeout time.Duration
batchAcceptanceAttempts uint
}

var _ settlement.HubClient = &HubClient{}
Expand Down Expand Up @@ -149,38 +150,39 @@ func NewDymensionHubClient(config settlement.Config, pubsub *pubsub.Server, logg
cryptocodec.RegisterInterfaces(interfaceRegistry)
protoCodec := codec.NewProtoCodec(interfaceRegistry)

dymesionHubClient := &HubClient{
config: &config,
logger: logger,
pubsub: pubsub,
ctx: ctx,
cancel: cancel,
eventMap: eventMap,
protoCodec: protoCodec,
retryAttempts: config.RetryAttempts,
batchAcceptanceTimeout: config.BatchAcceptanceTimeout,
retryMinDelay: config.RetryMinDelay,
retryMaxDelay: config.RetryMaxDelay,
dymensionHubClient := &HubClient{
config: &config,
logger: logger,
pubsub: pubsub,
ctx: ctx,
cancel: cancel,
eventMap: eventMap,
protoCodec: protoCodec,
retryAttempts: config.RetryAttempts,
batchAcceptanceTimeout: config.BatchAcceptanceTimeout,
batchAcceptanceAttempts: config.BatchAcceptanceAttempts,
retryMinDelay: config.RetryMinDelay,
retryMaxDelay: config.RetryMaxDelay,
}

for _, option := range options {
option(dymesionHubClient)
option(dymensionHubClient)
}

if dymesionHubClient.client == nil {
if dymensionHubClient.client == nil {
client, err := cosmosclient.New(
ctx,
getCosmosClientOptions(&config)...,
)
if err != nil {
return nil, err
}
dymesionHubClient.client = NewCosmosClient(client)
dymensionHubClient.client = NewCosmosClient(client)
}
dymesionHubClient.rollappQueryClient = dymesionHubClient.client.GetRollappClient()
dymesionHubClient.sequencerQueryClient = dymesionHubClient.client.GetSequencerClient()
dymensionHubClient.rollappQueryClient = dymensionHubClient.client.GetRollappClient()
dymensionHubClient.sequencerQueryClient = dymensionHubClient.client.GetSequencerClient()

return dymesionHubClient, nil
return dymensionHubClient, nil
}

// Start starts the HubClient.
Expand Down Expand Up @@ -214,23 +216,20 @@ func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult *

// TODO: probably should be changed to be a channel, as the eventHandler is also in the HubClient in he produces the event
postBatchSubscriberClient := fmt.Sprintf("%s-%d-%s", postBatchSubscriberPrefix, batch.StartHeight, uuid.New().String())
subscription, err := d.pubsub.Subscribe(d.ctx, postBatchSubscriberClient, settlement.EventQueryNewSettlementBatchAccepted)
subscription, err := d.pubsub.Subscribe(d.ctx, postBatchSubscriberClient, settlement.EventQueryNewSettlementBatchAccepted, 1000)
if err != nil {
return fmt.Errorf("pub sub subscribe to settlement state updates: %w", err)
}

//nolint:errcheck
defer d.pubsub.Unsubscribe(d.ctx, postBatchSubscriberClient, settlement.EventQueryNewSettlementBatchAccepted)

// Try submitting the batch to the settlement layer. If submission (i.e. only submission, not acceptance) fails we emit an unhealthy event
// and try again in the next loop. If submission succeeds we wait for the batch to be accepted by the settlement layer.
// If it is not accepted we emit an unhealthy event and start again the submission loop.
// Try submitting the batch to the settlement layer:
// 1. broadcast the transaction to the blockchain (with retries).
// 2. wait for the batch to be accepted by the settlement layer.
for {
select {
case <-d.ctx.Done():
return d.ctx.Err()
default:
err := d.submitBatch(msgUpdateState)
err := d.RunWithRetryInfinitely(func() error {
err := d.broadcastBatch(msgUpdateState)
if err != nil {
d.logger.Error(
"Submit batch",
Expand All @@ -241,49 +240,61 @@ func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult *
"error",
err,
)

// Sleep to allow context cancellation to take effect before retrying
time.Sleep(100 * time.Millisecond)
continue
}
return err
})
if err != nil {
return fmt.Errorf("broadcast batch: %w", err)
}

// Batch was submitted successfully. Wait for it to be accepted by the settlement layer.
timer := time.NewTimer(d.batchAcceptanceTimeout)
defer timer.Stop()

select {
case <-d.ctx.Done():
return d.ctx.Err()

case <-subscription.Cancelled():
return fmt.Errorf("subscription cancelled")

case <-subscription.Out():
d.logger.Info("batch accepted", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight)
return nil

case <-timer.C:
// Check if the batch was accepted by the settlement layer, and we've just missed the event.
includedBatch, err := d.waitForBatchInclusion(batch.StartHeight)
if err != nil {
d.logger.Error(
"Wait for batch inclusion",
"startHeight",
batch.StartHeight,
"endHeight",
batch.EndHeight,
"error",
err,
)

timer.Stop() // we don't forget to clean up
continue
attempt := uint64(0)

for {
select {
case <-d.ctx.Done():
return d.ctx.Err()

case <-subscription.Cancelled():
return fmt.Errorf("subscription cancelled")

case event := <-subscription.Out():
eventData, _ := event.Data().(*settlement.EventDataNewBatchAccepted)
if eventData.EndHeight != batch.EndHeight {
d.logger.Debug("Received event for a different batch, ignoring.", "event", eventData)
continue
}
d.logger.Info("Batch accepted.", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight, "stateIndex", eventData.StateIndex)
return nil

case <-timer.C:
// Check if the batch was accepted by the settlement layer, and we've just missed the event.
attempt++
includedBatch, err := d.pollForBatchInclusion(batch.EndHeight, attempt)
if err == nil && !includedBatch {
// no error, but still not included
timer.Reset(d.batchAcceptanceTimeout)
continue
}
// all good
if err == nil {
d.logger.Info("Batch accepted", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight)
return nil
}
}

// all good
d.logger.Info("batch accepted", "startHeight", includedBatch.StartHeight, "endHeight", includedBatch.EndHeight, "index", includedBatch.StateIndex)
return nil
// If errored polling, start again the submission loop.
d.logger.Error(
"Wait for batch inclusion",
"startHeight",
batch.StartHeight,
"endHeight",
batch.EndHeight,
"error",
err,
)
break
}
}
}
Expand Down Expand Up @@ -386,23 +397,23 @@ func (d *HubClient) GetSequencers(rollappID string) ([]*types.Sequencer, error)
return sequencersList, nil
}

func (d *HubClient) submitBatch(msgUpdateState *rollapptypes.MsgUpdateState) error {
err := d.RunWithRetry(func() error {
txResp, err := d.client.BroadcastTx(d.config.DymAccountName, msgUpdateState)
if err != nil || txResp.Code != 0 {
return fmt.Errorf("broadcast tx: %w", err)
}
return nil
})
return err
func (d *HubClient) broadcastBatch(msgUpdateState *rollapptypes.MsgUpdateState) error {
txResp, err := d.client.BroadcastTx(d.config.DymAccountName, msgUpdateState)
if err != nil || txResp.Code != 0 {
return fmt.Errorf("broadcast tx: %w", err)
}
return nil
}

func (d *HubClient) eventHandler() {
// TODO(omritoptix): eventsChannel should be a generic channel which is later filtered by the event type.
eventsChannel, err := d.client.SubscribeToEvents(d.ctx, "dymension-client", fmt.Sprintf(eventStateUpdate, d.config.RollappID))
subscriber := fmt.Sprintf("dymension-client-%s", uuid.New().String())
eventsChannel, err := d.client.SubscribeToEvents(d.ctx, subscriber, fmt.Sprintf(eventStateUpdate, d.config.RollappID), 1000)
if err != nil {
panic("Error subscribing to events")
}
// TODO: add defer unsubscribeAll

for {
select {
case <-d.ctx.Done():
Expand All @@ -412,7 +423,6 @@ func (d *HubClient) eventHandler() {
panic("Settlement WS disconnected")
case event := <-eventsChannel:
// Assert value is in map and publish it to the event bus
d.logger.Debug("Received event from settlement layer")
_, ok := d.eventMap[event.Query]
if !ok {
d.logger.Debug("Ignoring event. Type not supported", "event", event)
Expand Down Expand Up @@ -533,24 +543,19 @@ func (d *HubClient) convertStateInfoToResultRetrieveBatch(stateInfo *rollapptype
}, nil
}

// TODO(omritoptix): Change the retry attempts to be only for the batch polling. Also we need to have a more
// TODO: bullet proof check as theoretically the tx can stay in the mempool longer then our retry attempts.
func (d *HubClient) waitForBatchInclusion(batchStartHeight uint64) (*settlement.ResultRetrieveBatch, error) {
var res *settlement.ResultRetrieveBatch
err := d.RunWithRetry(
func() error {
latestBatch, err := d.GetLatestBatch(d.config.RollappID)
if err != nil {
return fmt.Errorf("get latest batch: %w", err)
}
if latestBatch.Batch.StartHeight != batchStartHeight {
return fmt.Errorf("latest batch start height not match expected start height: %w", gerr.ErrNotFound)
}
res = latestBatch
return nil
},
)
return res, err
// pollForBatchInclusion polls the hub for the inclusion of a batch with the given end height.
func (d *HubClient) pollForBatchInclusion(batchEndHeight, attempt uint64) (bool, error) {
latestBatch, err := d.GetLatestBatch(d.config.RollappID)
if err != nil {
return false, fmt.Errorf("get latest batch: %w", err)
}

// no error, but still not included
if attempt >= uint64(d.batchAcceptanceAttempts) {
return false, fmt.Errorf("timed out waiting for batch inclusion on settlement layer")
}

return latestBatch.Batch.EndHeight == batchEndHeight, nil
}

// RunWithRetry runs the given operation with retry, doing a number of attempts, and taking the last
Expand All @@ -564,3 +569,15 @@ func (d *HubClient) RunWithRetry(operation func() error) error {
retry.MaxDelay(d.retryMaxDelay),
)
}

// RunWithRetryInfinitely runs the given operation with retry, doing a number of attempts, and taking the last
// error only. It uses the context of the HubClient.
func (d *HubClient) RunWithRetryInfinitely(operation func() error) error {
return retry.Do(operation,
retry.Context(d.ctx),
retry.LastErrorOnly(true),
retry.Delay(d.retryMinDelay),
retry.Attempts(0),
retry.MaxDelay(d.retryMaxDelay),
)
}
Loading

0 comments on commit f7b9383

Please sign in to comment.