From f7b9383cf55976f3f55780055df7a6517e189034 Mon Sep 17 00:00:00 2001 From: Omri Date: Thu, 16 May 2024 18:28:09 +0200 Subject: [PATCH] fix(settlement): fixed submission bug where multiple events would cancel subscription (#842) Co-authored-by: Michael Tsitrin <114929630+mtsitrin@users.noreply.github.com> Co-authored-by: Michael Tsitrin Co-authored-by: danwt <30197399+danwt@users.noreply.github.com> --- config/defaults.go | 23 ++-- config/toml.go | 1 + mempool/v1/mempool.go | 4 +- settlement/config.go | 25 ++-- settlement/dymension/dymension.go | 209 ++++++++++++++++-------------- settlement/events.go | 6 + 6 files changed, 147 insertions(+), 121 deletions(-) diff --git a/config/defaults.go b/config/defaults.go index f9597eb61..1ae4c95ff 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -62,17 +62,18 @@ func DefaultConfig(home, chainId string) *NodeConfig { } defaultSLconfig := settlement.Config{ - KeyringBackend: "test", - NodeAddress: "http://127.0.0.1:36657", - RollappID: chainId, - KeyringHomeDir: keyringDir, - DymAccountName: "sequencer", - GasPrices: "1000000000adym", - SLGrpc: defaultSlGrpcConfig, - RetryMinDelay: 1 * time.Second, - RetryMaxDelay: 10 * time.Second, - BatchAcceptanceTimeout: 120 * time.Second, - RetryAttempts: 10, + KeyringBackend: "test", + NodeAddress: "http://127.0.0.1:36657", + RollappID: chainId, + KeyringHomeDir: keyringDir, + DymAccountName: "sequencer", + GasPrices: "1000000000adym", + SLGrpc: defaultSlGrpcConfig, + RetryMinDelay: 1 * time.Second, + RetryMaxDelay: 10 * time.Second, + BatchAcceptanceTimeout: 120 * time.Second, + BatchAcceptanceAttempts: 5, + RetryAttempts: 10, } cfg.SettlementConfig = defaultSLconfig diff --git a/config/toml.go b/config/toml.go index f76e0141e..3e0291277 100644 --- a/config/toml.go +++ b/config/toml.go @@ -121,6 +121,7 @@ retry_max_delay = "{{ .SettlementConfig.RetryMaxDelay }}" retry_min_delay = "{{ .SettlementConfig.RetryMinDelay }}" retry_attempts = "{{ .SettlementConfig.RetryAttempts }}" batch_acceptance_timeout = "{{ .SettlementConfig.BatchAcceptanceTimeout }}" +batch_acceptance_attempts = "{{ .SettlementConfig.BatchAcceptanceAttempts }}" #keyring and key name to be used for sequencer keyring_backend = "{{ .SettlementConfig.KeyringBackend }}" diff --git a/mempool/v1/mempool.go b/mempool/v1/mempool.go index 308390b3b..4d98dcaaa 100644 --- a/mempool/v1/mempool.go +++ b/mempool/v1/mempool.go @@ -257,7 +257,7 @@ func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error { // The caller must hold txmp.mtx excluxively. func (txmp *TxMempool) removeTxByKey(key types.TxKey) error { if elt, ok := txmp.txByKey[key]; ok { - w := elt.Value.(*WrappedTx) + w, _ := elt.Value.(*WrappedTx) delete(txmp.txByKey, key) delete(txmp.txBySender, w.sender) txmp.txs.Remove(elt) @@ -272,7 +272,7 @@ func (txmp *TxMempool) removeTxByKey(key types.TxKey) error { // removeTxByElement removes the specified transaction element from the mempool. // The caller must hold txmp.mtx exclusively. func (txmp *TxMempool) removeTxByElement(elt *clist.CElement) { - w := elt.Value.(*WrappedTx) + w, _ := elt.Value.(*WrappedTx) delete(txmp.txByKey, w.tx.Key()) delete(txmp.txBySender, w.sender) txmp.txs.Remove(elt) diff --git a/settlement/config.go b/settlement/config.go index be6c5da46..628a149c3 100644 --- a/settlement/config.go +++ b/settlement/config.go @@ -7,18 +7,19 @@ import ( // Config for the DymensionLayerClient type Config struct { - KeyringBackend string `mapstructure:"keyring_backend"` - NodeAddress string `mapstructure:"settlement_node_address"` - KeyringHomeDir string `mapstructure:"keyring_home_dir"` - DymAccountName string `mapstructure:"dym_account_name"` - RollappID string `mapstructure:"rollapp_id"` - GasLimit uint64 `mapstructure:"settlement_gas_limit"` - GasPrices string `mapstructure:"settlement_gas_prices"` - GasFees string `mapstructure:"settlement_gas_fees"` - RetryAttempts uint `mapstructure:"retry_attempts"` - RetryMaxDelay time.Duration `mapstructure:"retry_max_delay"` - RetryMinDelay time.Duration `mapstructure:"retry_min_delay"` - BatchAcceptanceTimeout time.Duration `mapstructure:"batch_acceptance_timeout"` + KeyringBackend string `mapstructure:"keyring_backend"` + NodeAddress string `mapstructure:"settlement_node_address"` + KeyringHomeDir string `mapstructure:"keyring_home_dir"` + DymAccountName string `mapstructure:"dym_account_name"` + RollappID string `mapstructure:"rollapp_id"` + GasLimit uint64 `mapstructure:"settlement_gas_limit"` + GasPrices string `mapstructure:"settlement_gas_prices"` + GasFees string `mapstructure:"settlement_gas_fees"` + RetryAttempts uint `mapstructure:"retry_attempts"` + RetryMaxDelay time.Duration `mapstructure:"retry_max_delay"` + RetryMinDelay time.Duration `mapstructure:"retry_min_delay"` + BatchAcceptanceTimeout time.Duration `mapstructure:"batch_acceptance_timeout"` + BatchAcceptanceAttempts uint `mapstructure:"batch_acceptance_attempts"` // For testing only. probably should be refactored ProposerPubKey string `json:"proposer_pub_key"` // Config used for sl shared grpc mock diff --git a/settlement/dymension/dymension.go b/settlement/dymension/dymension.go index f45d0b76e..4890c9774 100644 --- a/settlement/dymension/dymension.go +++ b/settlement/dymension/dymension.go @@ -92,10 +92,11 @@ type HubClient struct { // channel for getting notified when a batch is accepted by the settlement layer. // only one batch of a specific height can get accepted and we can are currently sending only one batch at a time. // for that reason it's safe to assume that if a batch is accepted, it refers to the last batch we've sent. - retryAttempts uint - retryMinDelay time.Duration - retryMaxDelay time.Duration - batchAcceptanceTimeout time.Duration + retryAttempts uint + retryMinDelay time.Duration + retryMaxDelay time.Duration + batchAcceptanceTimeout time.Duration + batchAcceptanceAttempts uint } var _ settlement.HubClient = &HubClient{} @@ -149,25 +150,26 @@ func NewDymensionHubClient(config settlement.Config, pubsub *pubsub.Server, logg cryptocodec.RegisterInterfaces(interfaceRegistry) protoCodec := codec.NewProtoCodec(interfaceRegistry) - dymesionHubClient := &HubClient{ - config: &config, - logger: logger, - pubsub: pubsub, - ctx: ctx, - cancel: cancel, - eventMap: eventMap, - protoCodec: protoCodec, - retryAttempts: config.RetryAttempts, - batchAcceptanceTimeout: config.BatchAcceptanceTimeout, - retryMinDelay: config.RetryMinDelay, - retryMaxDelay: config.RetryMaxDelay, + dymensionHubClient := &HubClient{ + config: &config, + logger: logger, + pubsub: pubsub, + ctx: ctx, + cancel: cancel, + eventMap: eventMap, + protoCodec: protoCodec, + retryAttempts: config.RetryAttempts, + batchAcceptanceTimeout: config.BatchAcceptanceTimeout, + batchAcceptanceAttempts: config.BatchAcceptanceAttempts, + retryMinDelay: config.RetryMinDelay, + retryMaxDelay: config.RetryMaxDelay, } for _, option := range options { - option(dymesionHubClient) + option(dymensionHubClient) } - if dymesionHubClient.client == nil { + if dymensionHubClient.client == nil { client, err := cosmosclient.New( ctx, getCosmosClientOptions(&config)..., @@ -175,12 +177,12 @@ func NewDymensionHubClient(config settlement.Config, pubsub *pubsub.Server, logg if err != nil { return nil, err } - dymesionHubClient.client = NewCosmosClient(client) + dymensionHubClient.client = NewCosmosClient(client) } - dymesionHubClient.rollappQueryClient = dymesionHubClient.client.GetRollappClient() - dymesionHubClient.sequencerQueryClient = dymesionHubClient.client.GetSequencerClient() + dymensionHubClient.rollappQueryClient = dymensionHubClient.client.GetRollappClient() + dymensionHubClient.sequencerQueryClient = dymensionHubClient.client.GetSequencerClient() - return dymesionHubClient, nil + return dymensionHubClient, nil } // Start starts the HubClient. @@ -214,7 +216,7 @@ func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult * // TODO: probably should be changed to be a channel, as the eventHandler is also in the HubClient in he produces the event postBatchSubscriberClient := fmt.Sprintf("%s-%d-%s", postBatchSubscriberPrefix, batch.StartHeight, uuid.New().String()) - subscription, err := d.pubsub.Subscribe(d.ctx, postBatchSubscriberClient, settlement.EventQueryNewSettlementBatchAccepted) + subscription, err := d.pubsub.Subscribe(d.ctx, postBatchSubscriberClient, settlement.EventQueryNewSettlementBatchAccepted, 1000) if err != nil { return fmt.Errorf("pub sub subscribe to settlement state updates: %w", err) } @@ -222,15 +224,12 @@ func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult * //nolint:errcheck defer d.pubsub.Unsubscribe(d.ctx, postBatchSubscriberClient, settlement.EventQueryNewSettlementBatchAccepted) - // Try submitting the batch to the settlement layer. If submission (i.e. only submission, not acceptance) fails we emit an unhealthy event - // and try again in the next loop. If submission succeeds we wait for the batch to be accepted by the settlement layer. - // If it is not accepted we emit an unhealthy event and start again the submission loop. + // Try submitting the batch to the settlement layer: + // 1. broadcast the transaction to the blockchain (with retries). + // 2. wait for the batch to be accepted by the settlement layer. for { - select { - case <-d.ctx.Done(): - return d.ctx.Err() - default: - err := d.submitBatch(msgUpdateState) + err := d.RunWithRetryInfinitely(func() error { + err := d.broadcastBatch(msgUpdateState) if err != nil { d.logger.Error( "Submit batch", @@ -241,49 +240,61 @@ func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult * "error", err, ) - - // Sleep to allow context cancellation to take effect before retrying - time.Sleep(100 * time.Millisecond) - continue } + return err + }) + if err != nil { + return fmt.Errorf("broadcast batch: %w", err) } // Batch was submitted successfully. Wait for it to be accepted by the settlement layer. timer := time.NewTimer(d.batchAcceptanceTimeout) defer timer.Stop() - - select { - case <-d.ctx.Done(): - return d.ctx.Err() - - case <-subscription.Cancelled(): - return fmt.Errorf("subscription cancelled") - - case <-subscription.Out(): - d.logger.Info("batch accepted", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight) - return nil - - case <-timer.C: - // Check if the batch was accepted by the settlement layer, and we've just missed the event. - includedBatch, err := d.waitForBatchInclusion(batch.StartHeight) - if err != nil { - d.logger.Error( - "Wait for batch inclusion", - "startHeight", - batch.StartHeight, - "endHeight", - batch.EndHeight, - "error", - err, - ) - - timer.Stop() // we don't forget to clean up - continue + attempt := uint64(0) + + for { + select { + case <-d.ctx.Done(): + return d.ctx.Err() + + case <-subscription.Cancelled(): + return fmt.Errorf("subscription cancelled") + + case event := <-subscription.Out(): + eventData, _ := event.Data().(*settlement.EventDataNewBatchAccepted) + if eventData.EndHeight != batch.EndHeight { + d.logger.Debug("Received event for a different batch, ignoring.", "event", eventData) + continue + } + d.logger.Info("Batch accepted.", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight, "stateIndex", eventData.StateIndex) + return nil + + case <-timer.C: + // Check if the batch was accepted by the settlement layer, and we've just missed the event. + attempt++ + includedBatch, err := d.pollForBatchInclusion(batch.EndHeight, attempt) + if err == nil && !includedBatch { + // no error, but still not included + timer.Reset(d.batchAcceptanceTimeout) + continue + } + // all good + if err == nil { + d.logger.Info("Batch accepted", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight) + return nil + } } - - // all good - d.logger.Info("batch accepted", "startHeight", includedBatch.StartHeight, "endHeight", includedBatch.EndHeight, "index", includedBatch.StateIndex) - return nil + // If errored polling, start again the submission loop. + d.logger.Error( + "Wait for batch inclusion", + "startHeight", + batch.StartHeight, + "endHeight", + batch.EndHeight, + "error", + err, + ) + break } } } @@ -386,23 +397,23 @@ func (d *HubClient) GetSequencers(rollappID string) ([]*types.Sequencer, error) return sequencersList, nil } -func (d *HubClient) submitBatch(msgUpdateState *rollapptypes.MsgUpdateState) error { - err := d.RunWithRetry(func() error { - txResp, err := d.client.BroadcastTx(d.config.DymAccountName, msgUpdateState) - if err != nil || txResp.Code != 0 { - return fmt.Errorf("broadcast tx: %w", err) - } - return nil - }) - return err +func (d *HubClient) broadcastBatch(msgUpdateState *rollapptypes.MsgUpdateState) error { + txResp, err := d.client.BroadcastTx(d.config.DymAccountName, msgUpdateState) + if err != nil || txResp.Code != 0 { + return fmt.Errorf("broadcast tx: %w", err) + } + return nil } func (d *HubClient) eventHandler() { // TODO(omritoptix): eventsChannel should be a generic channel which is later filtered by the event type. - eventsChannel, err := d.client.SubscribeToEvents(d.ctx, "dymension-client", fmt.Sprintf(eventStateUpdate, d.config.RollappID)) + subscriber := fmt.Sprintf("dymension-client-%s", uuid.New().String()) + eventsChannel, err := d.client.SubscribeToEvents(d.ctx, subscriber, fmt.Sprintf(eventStateUpdate, d.config.RollappID), 1000) if err != nil { panic("Error subscribing to events") } + // TODO: add defer unsubscribeAll + for { select { case <-d.ctx.Done(): @@ -412,7 +423,6 @@ func (d *HubClient) eventHandler() { panic("Settlement WS disconnected") case event := <-eventsChannel: // Assert value is in map and publish it to the event bus - d.logger.Debug("Received event from settlement layer") _, ok := d.eventMap[event.Query] if !ok { d.logger.Debug("Ignoring event. Type not supported", "event", event) @@ -533,24 +543,19 @@ func (d *HubClient) convertStateInfoToResultRetrieveBatch(stateInfo *rollapptype }, nil } -// TODO(omritoptix): Change the retry attempts to be only for the batch polling. Also we need to have a more -// TODO: bullet proof check as theoretically the tx can stay in the mempool longer then our retry attempts. -func (d *HubClient) waitForBatchInclusion(batchStartHeight uint64) (*settlement.ResultRetrieveBatch, error) { - var res *settlement.ResultRetrieveBatch - err := d.RunWithRetry( - func() error { - latestBatch, err := d.GetLatestBatch(d.config.RollappID) - if err != nil { - return fmt.Errorf("get latest batch: %w", err) - } - if latestBatch.Batch.StartHeight != batchStartHeight { - return fmt.Errorf("latest batch start height not match expected start height: %w", gerr.ErrNotFound) - } - res = latestBatch - return nil - }, - ) - return res, err +// pollForBatchInclusion polls the hub for the inclusion of a batch with the given end height. +func (d *HubClient) pollForBatchInclusion(batchEndHeight, attempt uint64) (bool, error) { + latestBatch, err := d.GetLatestBatch(d.config.RollappID) + if err != nil { + return false, fmt.Errorf("get latest batch: %w", err) + } + + // no error, but still not included + if attempt >= uint64(d.batchAcceptanceAttempts) { + return false, fmt.Errorf("timed out waiting for batch inclusion on settlement layer") + } + + return latestBatch.Batch.EndHeight == batchEndHeight, nil } // RunWithRetry runs the given operation with retry, doing a number of attempts, and taking the last @@ -564,3 +569,15 @@ func (d *HubClient) RunWithRetry(operation func() error) error { retry.MaxDelay(d.retryMaxDelay), ) } + +// RunWithRetryInfinitely runs the given operation with retry, doing a number of attempts, and taking the last +// error only. It uses the context of the HubClient. +func (d *HubClient) RunWithRetryInfinitely(operation func() error) error { + return retry.Do(operation, + retry.Context(d.ctx), + retry.LastErrorOnly(true), + retry.Delay(d.retryMinDelay), + retry.Attempts(0), + retry.MaxDelay(d.retryMaxDelay), + ) +} diff --git a/settlement/events.go b/settlement/events.go index 09de1b07a..41447184e 100644 --- a/settlement/events.go +++ b/settlement/events.go @@ -1,6 +1,8 @@ package settlement import ( + "fmt" + "github.com/dymensionxyz/dymint/types" uevent "github.com/dymensionxyz/dymint/utils/event" ) @@ -33,6 +35,10 @@ type EventDataNewBatchAccepted struct { StateIndex uint64 } +func (e EventDataNewBatchAccepted) String() string { + return fmt.Sprintf("EndHeight: %d, StateIndex: %d", e.EndHeight, e.StateIndex) +} + type EventDataSequencersListUpdated struct { // Sequencers is the list of new sequencers Sequencers []types.Sequencer