From 0cd09f3d0a8430c1e9f34814930c168553c143bc Mon Sep 17 00:00:00 2001 From: Daniel T <30197399+danwt@users.noreply.github.com> Date: Wed, 8 May 2024 11:20:19 +0100 Subject: [PATCH] feat(DA): expose configuration options for retry loop backoff (#757) Co-authored-by: github-actions Co-authored-by: zale144 Co-authored-by: Michael Tsitrin --- CHANGELOG.md | 1 + config/toml.go | 3 +- da/celestia/celestia.go | 82 +++++++++++++++++++++--------------- da/celestia/celestia_test.go | 2 +- da/celestia/config.go | 32 +++++++------- da/celestia/config_test.go | 63 +++++++++++++++++++++++++++ da/celestia/rpc_test.go | 4 +- da/da_test.go | 2 +- utils/retry/backoff.go | 6 +-- 9 files changed, 136 insertions(+), 59 deletions(-) create mode 100644 da/celestia/config_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d620c5f2..2de107fb7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ * **logs:** make logs more readable in a couple places, fix race cond ([#749](https://github.com/dymensionxyz/dymint/issues/749)) ([f05ef39](https://github.com/dymensionxyz/dymint/commit/f05ef3957b754c05fbc90aa39eabce80bbe65933)) * **manager:** get fresh height in loop ([#781](https://github.com/dymensionxyz/dymint/issues/781)) ([e4df480](https://github.com/dymensionxyz/dymint/commit/e4df48037a78965dbac9e747dd296f39360e396c)) * **p2p:** validate block before applying and not before caching in p2p gossiping ([#723](https://github.com/dymensionxyz/dymint/issues/723)) ([98371b5](https://github.com/dymensionxyz/dymint/commit/98371b5220613e70f3274fab5593e02ba532f7db)) +* **p2p:** validating gossiped block is created by the proposer ([#737](https://github.com/dymensionxyz/dymint/issues/737)) ([851b312](https://github.com/dymensionxyz/dymint/commit/851b312620233a9fb1abe55214a678322e7b0c68)) * **produce loop:** handle unauthenticated error in settlement layer ([#726](https://github.com/dymensionxyz/dymint/issues/726)) ([33e78d1](https://github.com/dymensionxyz/dymint/commit/33e78d116b5f14b91b8b3bda2b6cbfee9040e2d3)) * **rpc:** nil panic in rpc/json/handler.go WriteError ([#750](https://github.com/dymensionxyz/dymint/issues/750)) ([e09709b](https://github.com/dymensionxyz/dymint/commit/e09709b428a33da002defb9f13178fa19b81a69b)) * **settlement:** remove state index from proto ([#777](https://github.com/dymensionxyz/dymint/issues/777)) ([767b8fd](https://github.com/dymensionxyz/dymint/commit/767b8fdb490c37deee43ac023688410bbb98ccb0)) diff --git a/config/toml.go b/config/toml.go index 75b0e1e28..9be4f3117 100644 --- a/config/toml.go +++ b/config/toml.go @@ -79,6 +79,7 @@ batch_submit_max_time = "{{ .BlockManagerConfig.BatchSubmitMaxTime }}" ### da config ### da_layer = "{{ .DALayer }}" # mock, celestia, avail namespace_id = "{{ .BlockManagerConfig.NamespaceID }}" +# this should be json matching the celestia.Config type da_config = "{{ .DAConfig }}" # max size of batch in bytes that can be accepted by DA @@ -91,7 +92,7 @@ gossiped_blocks_cache_size = {{ .BlockManagerConfig.GossipedBlocksCacheSize }} bootstrap_time = "{{ .BootstrapTime }}" #celestia config example: -# da_config = "{\"base_url\": \"http://127.0.0.1:26658\", \"timeout\": 60000000000, \"gas_prices\":0.1, \"token\":\"TOKEN\"}" +# da_config = "{\"base_url\":\"http:\/\/127.0.0.1:26658\",\"timeout\":5000000000,\"gas_prices\":0.1,\"auth_token\":\"TOKEN\",\"backoff\":{\"initial_delay\":6000000000,\"max_delay\":6000000000,\"growth_factor\":2},\"retry_attempts\":4,\"retry_delay\":3000000000}" # Avail config example: # da_config = "{\"seed\": \"MNEMONIC\", \"api_url\": \"wss://kate.avail.tools/ws\", \"app_id\": 0, \"tip\":10}" diff --git a/da/celestia/celestia.go b/da/celestia/celestia.go index d717581f8..b46b627c3 100644 --- a/da/celestia/celestia.go +++ b/da/celestia/celestia.go @@ -32,14 +32,11 @@ import ( type DataAvailabilityLayerClient struct { rpc celtypes.CelestiaRPCClient - pubsubServer *pubsub.Server - config Config - logger types.Logger - ctx context.Context - cancel context.CancelFunc - rpcRetryDelay time.Duration - rpcRetryAttempts int - submitBackoff uretry.BackoffConfig + pubsubServer *pubsub.Server + config Config + logger types.Logger + ctx context.Context + cancel context.CancelFunc } var ( @@ -57,21 +54,21 @@ func WithRPCClient(rpc celtypes.CelestiaRPCClient) da.Option { // WithRPCRetryDelay sets failed rpc calls retry delay. func WithRPCRetryDelay(delay time.Duration) da.Option { return func(daLayerClient da.DataAvailabilityLayerClient) { - daLayerClient.(*DataAvailabilityLayerClient).rpcRetryDelay = delay + daLayerClient.(*DataAvailabilityLayerClient).config.RetryDelay = delay } } // WithRPCAttempts sets failed rpc calls retry attempts. func WithRPCAttempts(attempts int) da.Option { return func(daLayerClient da.DataAvailabilityLayerClient) { - daLayerClient.(*DataAvailabilityLayerClient).rpcRetryAttempts = attempts + daLayerClient.(*DataAvailabilityLayerClient).config.RetryAttempts = attempts } } // WithSubmitBackoff sets submit retry delay config. func WithSubmitBackoff(c uretry.BackoffConfig) da.Option { return func(daLayerClient da.DataAvailabilityLayerClient) { - daLayerClient.(*DataAvailabilityLayerClient).submitBackoff = c + daLayerClient.(*DataAvailabilityLayerClient).config.Backoff = c } } @@ -79,29 +76,15 @@ func WithSubmitBackoff(c uretry.BackoffConfig) da.Option { func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.Server, kvStore store.KVStore, logger types.Logger, options ...da.Option) error { c.logger = logger - if len(config) <= 0 { - return errors.New("config is empty") - } - err := json.Unmarshal(config, &c.config) - if err != nil { - return err - } - err = (&c.config).InitNamespaceID() + var err error + c.config, err = createConfig(config) if err != nil { - return err + return fmt.Errorf("create config: %w: %w", err, gerr.ErrInvalidArgument) } - if c.config.GasPrices == 0 { - return errors.New("gas prices must be set") - } + c.ctx, c.cancel = context.WithCancel(context.Background()) c.pubsubServer = pubsubServer - // Set defaults - c.rpcRetryAttempts = defaultRpcCheckAttempts - c.rpcRetryDelay = defaultRpcRetryDelay - c.submitBackoff = defaultSubmitBackoff - - c.ctx, c.cancel = context.WithCancel(context.Background()) // Apply options for _, apply := range options { @@ -111,6 +94,35 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S return nil } +func createConfig(bz []byte) (c Config, err error) { + if len(bz) <= 0 { + return c, errors.New("supplied config is empty") + } + err = json.Unmarshal(bz, &c) + if err != nil { + return c, fmt.Errorf("json unmarshal: %w", err) + } + + err = c.InitNamespaceID() + if err != nil { + return c, fmt.Errorf("init namespace id: %w", err) + } + + if c.GasPrices == 0 { + return c, errors.New("gas prices must be set") + } + + // NOTE: 0 is valid value for RetryAttempts + + if c.RetryDelay == 0 { + c.RetryDelay = defaultRpcRetryDelay + } + if c.Backoff == (uretry.BackoffConfig{}) { + c.Backoff = defaultSubmitBackoff + } + return c, nil +} + // Start prepares DataAvailabilityLayerClient to work. func (c *DataAvailabilityLayerClient) Start() (err error) { c.logger.Info("starting Celestia Data Availability Layer Client") @@ -200,7 +212,7 @@ func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS } } - backoff := c.submitBackoff.Backoff() + backoff := c.config.Backoff.Backoff() for { select { @@ -294,9 +306,9 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet return nil }, - retry.Attempts(uint(c.rpcRetryAttempts)), + retry.Attempts(uint(c.config.RetryAttempts)), retry.DelayType(retry.FixedDelay), - retry.Delay(c.rpcRetryDelay), + retry.Delay(c.config.RetryDelay), ) if err != nil { c.logger.Error("RetrieveBatches process failed", "error", err) @@ -423,7 +435,7 @@ func (c *DataAvailabilityLayerClient) CheckBatchAvailability(daMetaData *da.DASu } return nil - }, retry.Attempts(uint(c.rpcRetryAttempts)), retry.DelayType(retry.FixedDelay), retry.Delay(c.rpcRetryDelay)) + }, retry.Attempts(uint(c.config.RetryAttempts)), retry.DelayType(retry.FixedDelay), retry.Delay(c.config.RetryDelay)) if err != nil { c.logger.Error("CheckAvailability process failed", "error", err) } @@ -570,8 +582,8 @@ func (c *DataAvailabilityLayerClient) submit(daBlob da.Blob) (uint64, da.Commitm }, retry.Context(c.ctx), retry.LastErrorOnly(true), - retry.Delay(c.rpcRetryDelay), - retry.Attempts(uint(c.rpcRetryAttempts)), + retry.Delay(c.config.RetryDelay), + retry.Attempts(uint(c.config.RetryAttempts)), retry.DelayType(retry.FixedDelay), ) if err != nil { diff --git a/da/celestia/celestia_test.go b/da/celestia/celestia_test.go index e575fb0ba..a1777a10b 100644 --- a/da/celestia/celestia_test.go +++ b/da/celestia/celestia_test.go @@ -302,7 +302,7 @@ func setDAandMock(t *testing.T) (*mocks.MockCelestiaRPCClient, da.DataAvailabili config := celestia.Config{ BaseURL: "http://localhost:26658", Timeout: 30 * time.Second, - GasPrices: celestia.CelestiaDefaultConfig.GasPrices, + GasPrices: celestia.DefaultGasPrices, NamespaceIDStr: "0000000000000000ffff", } err = config.InitNamespaceID() diff --git a/da/celestia/config.go b/da/celestia/config.go index d76512e96..fafc40252 100644 --- a/da/celestia/config.go +++ b/da/celestia/config.go @@ -12,10 +12,9 @@ import ( ) const ( - defaultRpcRetryDelay = 1 * time.Second - defaultRpcCheckAttempts = 10 - namespaceVersion = 0 - defaultGasPrices = 0.1 + defaultRpcRetryDelay = 3 * time.Second + namespaceVersion = 0 + DefaultGasPrices = 0.1 ) var defaultSubmitBackoff = uretry.NewBackoffConfig( @@ -25,20 +24,23 @@ var defaultSubmitBackoff = uretry.NewBackoffConfig( // Config stores Celestia DALC configuration parameters. type Config struct { - BaseURL string `json:"base_url"` - AppNodeURL string `json:"app_node_url"` - Timeout time.Duration `json:"timeout"` - GasPrices float64 `json:"gas_prices"` - NamespaceIDStr string `json:"namespace_id"` - AuthToken string `json:"auth_token"` - NamespaceID openrpcns.Namespace `json:"-"` + BaseURL string `json:"base_url,omitempty"` + AppNodeURL string `json:"app_node_url,omitempty"` + Timeout time.Duration `json:"timeout,omitempty"` + GasPrices float64 `json:"gas_prices,omitempty"` + NamespaceIDStr string `json:"namespace_id,omitempty"` + AuthToken string `json:"auth_token,omitempty"` + Backoff uretry.BackoffConfig `json:"backoff,omitempty"` + RetryAttempts int `json:"retry_attempts,omitempty"` + RetryDelay time.Duration `json:"retry_delay,omitempty"` + NamespaceID openrpcns.Namespace `json:"-"` } -var CelestiaDefaultConfig = Config{ +var TestConfig = Config{ BaseURL: "http://127.0.0.1:26658", AppNodeURL: "", Timeout: 5 * time.Second, - GasPrices: defaultGasPrices, + GasPrices: DefaultGasPrices, NamespaceIDStr: "", NamespaceID: openrpcns.Namespace{Version: namespaceVersion, ID: []byte{0, 0, 0, 0, 0, 0, 0, 0, 255, 255}}, } @@ -59,12 +61,12 @@ func (c *Config) InitNamespaceID() error { // Decode NamespaceID from string to byte array namespaceBytes, err := hex.DecodeString(c.NamespaceIDStr) if err != nil { - return err + return fmt.Errorf("decode string: %w", err) } // Check if NamespaceID is of correct length (10 bytes) if len(namespaceBytes) != openrpcns.NamespaceVersionZeroIDSize { - return fmt.Errorf("invalid namespace id length: %v must be %v", len(namespaceBytes), openrpcns.NamespaceVersionZeroIDSize) + return fmt.Errorf("wrong length: got: %v: expect %v", len(namespaceBytes), openrpcns.NamespaceVersionZeroIDSize) } ns, err := openrpcns.New(openrpcns.NamespaceVersionZero, append(openrpcns.NamespaceVersionZeroPrefix, namespaceBytes...)) diff --git a/da/celestia/config_test.go b/da/celestia/config_test.go new file mode 100644 index 000000000..cad82d7b2 --- /dev/null +++ b/da/celestia/config_test.go @@ -0,0 +1,63 @@ +package celestia + +import ( + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/require" + + uretry "github.com/dymensionxyz/dymint/utils/retry" + + "github.com/stretchr/testify/assert" +) + +func TestCreateConfig(t *testing.T) { + mustMarshal := func(v any) []byte { + bz, _ := json.Marshal(v) + return bz + } + t.Run("simple", func(t *testing.T) { + c := Config{ + BaseURL: TestConfig.BaseURL, + AppNodeURL: TestConfig.AppNodeURL, + Timeout: TestConfig.Timeout, + GasPrices: 42, + Backoff: uretry.NewBackoffConfig(uretry.WithGrowthFactor(1.65)), + RetryAttempts: 10, + RetryDelay: 10 * time.Second, + } + bz := mustMarshal(c) + gotC, err := createConfig(bz) + require.NoError(t, err) + assert.Equal(t, c.Backoff, gotC.Backoff) + assert.Equal(t, c.RetryAttempts, gotC.RetryAttempts) + assert.Equal(t, c.RetryDelay, gotC.RetryDelay) + }) + t.Run("no backoff", func(t *testing.T) { + c := Config{ + BaseURL: TestConfig.BaseURL, + AppNodeURL: TestConfig.AppNodeURL, + Timeout: TestConfig.Timeout, + GasPrices: 42, + } + bz := mustMarshal(c) + gotC, err := createConfig(bz) + require.NoError(t, err) + assert.Equal(t, defaultSubmitBackoff, gotC.Backoff) + }) + t.Run("generate example", func(t *testing.T) { + c := Config{ + BaseURL: TestConfig.BaseURL, + AppNodeURL: TestConfig.AppNodeURL, + Timeout: TestConfig.Timeout, + GasPrices: 0.1, + AuthToken: "TOKEN", + Backoff: defaultSubmitBackoff, + RetryAttempts: 4, + RetryDelay: 3 * time.Second, + } + bz := mustMarshal(c) + t.Log(string(bz)) + }) +} diff --git a/da/celestia/rpc_test.go b/da/celestia/rpc_test.go index 11f622533..3667d2b83 100644 --- a/da/celestia/rpc_test.go +++ b/da/celestia/rpc_test.go @@ -52,7 +52,7 @@ func exampleNMT(nidSize int, ignoreMaxNamespace bool, leavesNIDs ...byte) *nmt.N func TestSubmitBatch(t *testing.T) { assert := assert.New(t) require := require.New(t) - configBytes, err := json.Marshal(celestia.CelestiaDefaultConfig) + configBytes, err := json.Marshal(celestia.TestConfig) require.NoError(err) batch := &types.Batch{ StartHeight: 0, @@ -103,7 +103,6 @@ func TestSubmitBatch(t *testing.T) { }, } for _, tc := range cases { - t.Log("Case name ", tc.name) // Create mock clients mockRPCClient := mocks.NewMockCelestiaRPCClient(t) @@ -112,7 +111,6 @@ func TestSubmitBatch(t *testing.T) { celestia.WithSubmitBackoff(uretry.NewBackoffConfig(uretry.WithInitialDelay(10*time.Millisecond), uretry.WithMaxDelay(10*time.Millisecond))), celestia.WithRPCClient(mockRPCClient), celestia.WithRPCAttempts(1), - celestia.WithRPCRetryDelay(10 * time.Millisecond), } // Subscribe to the health status event pubsubServer := pubsub.NewServer() diff --git a/da/da_test.go b/da/da_test.go index dddbee57c..50aa4c665 100644 --- a/da/da_test.go +++ b/da/da_test.go @@ -140,7 +140,7 @@ func doTestRetrieve(t *testing.T, dalc da.DataAvailabilityLayerClient) { config := celestia.Config{ BaseURL: "http://localhost:26658", Timeout: 30 * time.Second, - GasPrices: celestia.CelestiaDefaultConfig.GasPrices, + GasPrices: celestia.DefaultGasPrices, } err := config.InitNamespaceID() require.NoError(err) diff --git a/utils/retry/backoff.go b/utils/retry/backoff.go index a416e3ecc..05d7ac53d 100644 --- a/utils/retry/backoff.go +++ b/utils/retry/backoff.go @@ -12,9 +12,9 @@ const ( // BackoffConfig is a configuration for a backoff, it's used to create new instances type BackoffConfig struct { - InitialDelay time.Duration - MaxDelay time.Duration - GrowthFactor float64 + InitialDelay time.Duration `json:"initial_delay"` + MaxDelay time.Duration `json:"max_delay"` + GrowthFactor float64 `json:"growth_factor"` } // Backoff creates a new Backoff instance with the configuration (starting at 0 attempts made so far)