Skip to content

Commit

Permalink
feat(DA): expose configuration options for retry loop backoff (#757)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions <[email protected]>
Co-authored-by: zale144 <[email protected]>
Co-authored-by: Michael Tsitrin <[email protected]>
  • Loading branch information
4 people authored May 8, 2024
1 parent fe78b6d commit 0cd09f3
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 59 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* **logs:** make logs more readable in a couple places, fix race cond ([#749](https://github.com/dymensionxyz/dymint/issues/749)) ([f05ef39](https://github.com/dymensionxyz/dymint/commit/f05ef3957b754c05fbc90aa39eabce80bbe65933))
* **manager:** get fresh height in loop ([#781](https://github.com/dymensionxyz/dymint/issues/781)) ([e4df480](https://github.com/dymensionxyz/dymint/commit/e4df48037a78965dbac9e747dd296f39360e396c))
* **p2p:** validate block before applying and not before caching in p2p gossiping ([#723](https://github.com/dymensionxyz/dymint/issues/723)) ([98371b5](https://github.com/dymensionxyz/dymint/commit/98371b5220613e70f3274fab5593e02ba532f7db))
* **p2p:** validating gossiped block is created by the proposer ([#737](https://github.com/dymensionxyz/dymint/issues/737)) ([851b312](https://github.com/dymensionxyz/dymint/commit/851b312620233a9fb1abe55214a678322e7b0c68))
* **produce loop:** handle unauthenticated error in settlement layer ([#726](https://github.com/dymensionxyz/dymint/issues/726)) ([33e78d1](https://github.com/dymensionxyz/dymint/commit/33e78d116b5f14b91b8b3bda2b6cbfee9040e2d3))
* **rpc:** nil panic in rpc/json/handler.go WriteError ([#750](https://github.com/dymensionxyz/dymint/issues/750)) ([e09709b](https://github.com/dymensionxyz/dymint/commit/e09709b428a33da002defb9f13178fa19b81a69b))
* **settlement:** remove state index from proto ([#777](https://github.com/dymensionxyz/dymint/issues/777)) ([767b8fd](https://github.com/dymensionxyz/dymint/commit/767b8fdb490c37deee43ac023688410bbb98ccb0))
Expand Down
3 changes: 2 additions & 1 deletion config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ batch_submit_max_time = "{{ .BlockManagerConfig.BatchSubmitMaxTime }}"
### da config ###
da_layer = "{{ .DALayer }}" # mock, celestia, avail
namespace_id = "{{ .BlockManagerConfig.NamespaceID }}"
# this should be json matching the celestia.Config type
da_config = "{{ .DAConfig }}"
# max size of batch in bytes that can be accepted by DA
Expand All @@ -91,7 +92,7 @@ gossiped_blocks_cache_size = {{ .BlockManagerConfig.GossipedBlocksCacheSize }}
bootstrap_time = "{{ .BootstrapTime }}"
#celestia config example:
# da_config = "{\"base_url\": \"http://127.0.0.1:26658\", \"timeout\": 60000000000, \"gas_prices\":0.1, \"token\":\"TOKEN\"}"
# da_config = "{\"base_url\":\"http:\/\/127.0.0.1:26658\",\"timeout\":5000000000,\"gas_prices\":0.1,\"auth_token\":\"TOKEN\",\"backoff\":{\"initial_delay\":6000000000,\"max_delay\":6000000000,\"growth_factor\":2},\"retry_attempts\":4,\"retry_delay\":3000000000}"
# Avail config example:
# da_config = "{\"seed\": \"MNEMONIC\", \"api_url\": \"wss://kate.avail.tools/ws\", \"app_id\": 0, \"tip\":10}"
Expand Down
82 changes: 47 additions & 35 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,11 @@ import (
type DataAvailabilityLayerClient struct {
rpc celtypes.CelestiaRPCClient

pubsubServer *pubsub.Server
config Config
logger types.Logger
ctx context.Context
cancel context.CancelFunc
rpcRetryDelay time.Duration
rpcRetryAttempts int
submitBackoff uretry.BackoffConfig
pubsubServer *pubsub.Server
config Config
logger types.Logger
ctx context.Context
cancel context.CancelFunc
}

var (
Expand All @@ -57,51 +54,37 @@ func WithRPCClient(rpc celtypes.CelestiaRPCClient) da.Option {
// WithRPCRetryDelay sets failed rpc calls retry delay.
func WithRPCRetryDelay(delay time.Duration) da.Option {
return func(daLayerClient da.DataAvailabilityLayerClient) {
daLayerClient.(*DataAvailabilityLayerClient).rpcRetryDelay = delay
daLayerClient.(*DataAvailabilityLayerClient).config.RetryDelay = delay
}
}

// WithRPCAttempts sets failed rpc calls retry attempts.
func WithRPCAttempts(attempts int) da.Option {
return func(daLayerClient da.DataAvailabilityLayerClient) {
daLayerClient.(*DataAvailabilityLayerClient).rpcRetryAttempts = attempts
daLayerClient.(*DataAvailabilityLayerClient).config.RetryAttempts = attempts
}
}

// WithSubmitBackoff sets submit retry delay config.
func WithSubmitBackoff(c uretry.BackoffConfig) da.Option {
return func(daLayerClient da.DataAvailabilityLayerClient) {
daLayerClient.(*DataAvailabilityLayerClient).submitBackoff = c
daLayerClient.(*DataAvailabilityLayerClient).config.Backoff = c
}
}

// Init initializes DataAvailabilityLayerClient instance.
func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.Server, kvStore store.KVStore, logger types.Logger, options ...da.Option) error {
c.logger = logger

if len(config) <= 0 {
return errors.New("config is empty")
}
err := json.Unmarshal(config, &c.config)
if err != nil {
return err
}
err = (&c.config).InitNamespaceID()
var err error
c.config, err = createConfig(config)
if err != nil {
return err
return fmt.Errorf("create config: %w: %w", err, gerr.ErrInvalidArgument)
}

if c.config.GasPrices == 0 {
return errors.New("gas prices must be set")
}
c.ctx, c.cancel = context.WithCancel(context.Background())

c.pubsubServer = pubsubServer
// Set defaults
c.rpcRetryAttempts = defaultRpcCheckAttempts
c.rpcRetryDelay = defaultRpcRetryDelay
c.submitBackoff = defaultSubmitBackoff

c.ctx, c.cancel = context.WithCancel(context.Background())

// Apply options
for _, apply := range options {
Expand All @@ -111,6 +94,35 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S
return nil
}

func createConfig(bz []byte) (c Config, err error) {
if len(bz) <= 0 {
return c, errors.New("supplied config is empty")
}
err = json.Unmarshal(bz, &c)
if err != nil {
return c, fmt.Errorf("json unmarshal: %w", err)
}

err = c.InitNamespaceID()
if err != nil {
return c, fmt.Errorf("init namespace id: %w", err)
}

if c.GasPrices == 0 {
return c, errors.New("gas prices must be set")
}

// NOTE: 0 is valid value for RetryAttempts

if c.RetryDelay == 0 {
c.RetryDelay = defaultRpcRetryDelay
}
if c.Backoff == (uretry.BackoffConfig{}) {
c.Backoff = defaultSubmitBackoff
}
return c, nil
}

// Start prepares DataAvailabilityLayerClient to work.
func (c *DataAvailabilityLayerClient) Start() (err error) {
c.logger.Info("starting Celestia Data Availability Layer Client")
Expand Down Expand Up @@ -200,7 +212,7 @@ func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS
}
}

backoff := c.submitBackoff.Backoff()
backoff := c.config.Backoff.Backoff()

for {
select {
Expand Down Expand Up @@ -294,9 +306,9 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet

return nil
},
retry.Attempts(uint(c.rpcRetryAttempts)),
retry.Attempts(uint(c.config.RetryAttempts)),
retry.DelayType(retry.FixedDelay),
retry.Delay(c.rpcRetryDelay),
retry.Delay(c.config.RetryDelay),
)
if err != nil {
c.logger.Error("RetrieveBatches process failed", "error", err)
Expand Down Expand Up @@ -423,7 +435,7 @@ func (c *DataAvailabilityLayerClient) CheckBatchAvailability(daMetaData *da.DASu
}

return nil
}, retry.Attempts(uint(c.rpcRetryAttempts)), retry.DelayType(retry.FixedDelay), retry.Delay(c.rpcRetryDelay))
}, retry.Attempts(uint(c.config.RetryAttempts)), retry.DelayType(retry.FixedDelay), retry.Delay(c.config.RetryDelay))
if err != nil {
c.logger.Error("CheckAvailability process failed", "error", err)
}
Expand Down Expand Up @@ -570,8 +582,8 @@ func (c *DataAvailabilityLayerClient) submit(daBlob da.Blob) (uint64, da.Commitm
},
retry.Context(c.ctx),
retry.LastErrorOnly(true),
retry.Delay(c.rpcRetryDelay),
retry.Attempts(uint(c.rpcRetryAttempts)),
retry.Delay(c.config.RetryDelay),
retry.Attempts(uint(c.config.RetryAttempts)),
retry.DelayType(retry.FixedDelay),
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion da/celestia/celestia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func setDAandMock(t *testing.T) (*mocks.MockCelestiaRPCClient, da.DataAvailabili
config := celestia.Config{
BaseURL: "http://localhost:26658",
Timeout: 30 * time.Second,
GasPrices: celestia.CelestiaDefaultConfig.GasPrices,
GasPrices: celestia.DefaultGasPrices,
NamespaceIDStr: "0000000000000000ffff",
}
err = config.InitNamespaceID()
Expand Down
32 changes: 17 additions & 15 deletions da/celestia/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ import (
)

const (
defaultRpcRetryDelay = 1 * time.Second
defaultRpcCheckAttempts = 10
namespaceVersion = 0
defaultGasPrices = 0.1
defaultRpcRetryDelay = 3 * time.Second
namespaceVersion = 0
DefaultGasPrices = 0.1
)

var defaultSubmitBackoff = uretry.NewBackoffConfig(
Expand All @@ -25,20 +24,23 @@ var defaultSubmitBackoff = uretry.NewBackoffConfig(

// Config stores Celestia DALC configuration parameters.
type Config struct {
BaseURL string `json:"base_url"`
AppNodeURL string `json:"app_node_url"`
Timeout time.Duration `json:"timeout"`
GasPrices float64 `json:"gas_prices"`
NamespaceIDStr string `json:"namespace_id"`
AuthToken string `json:"auth_token"`
NamespaceID openrpcns.Namespace `json:"-"`
BaseURL string `json:"base_url,omitempty"`
AppNodeURL string `json:"app_node_url,omitempty"`
Timeout time.Duration `json:"timeout,omitempty"`
GasPrices float64 `json:"gas_prices,omitempty"`
NamespaceIDStr string `json:"namespace_id,omitempty"`
AuthToken string `json:"auth_token,omitempty"`
Backoff uretry.BackoffConfig `json:"backoff,omitempty"`
RetryAttempts int `json:"retry_attempts,omitempty"`
RetryDelay time.Duration `json:"retry_delay,omitempty"`
NamespaceID openrpcns.Namespace `json:"-"`
}

var CelestiaDefaultConfig = Config{
var TestConfig = Config{
BaseURL: "http://127.0.0.1:26658",
AppNodeURL: "",
Timeout: 5 * time.Second,
GasPrices: defaultGasPrices,
GasPrices: DefaultGasPrices,
NamespaceIDStr: "",
NamespaceID: openrpcns.Namespace{Version: namespaceVersion, ID: []byte{0, 0, 0, 0, 0, 0, 0, 0, 255, 255}},
}
Expand All @@ -59,12 +61,12 @@ func (c *Config) InitNamespaceID() error {
// Decode NamespaceID from string to byte array
namespaceBytes, err := hex.DecodeString(c.NamespaceIDStr)
if err != nil {
return err
return fmt.Errorf("decode string: %w", err)
}

// Check if NamespaceID is of correct length (10 bytes)
if len(namespaceBytes) != openrpcns.NamespaceVersionZeroIDSize {
return fmt.Errorf("invalid namespace id length: %v must be %v", len(namespaceBytes), openrpcns.NamespaceVersionZeroIDSize)
return fmt.Errorf("wrong length: got: %v: expect %v", len(namespaceBytes), openrpcns.NamespaceVersionZeroIDSize)
}

ns, err := openrpcns.New(openrpcns.NamespaceVersionZero, append(openrpcns.NamespaceVersionZeroPrefix, namespaceBytes...))
Expand Down
63 changes: 63 additions & 0 deletions da/celestia/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package celestia

import (
"encoding/json"
"testing"
"time"

"github.com/stretchr/testify/require"

uretry "github.com/dymensionxyz/dymint/utils/retry"

"github.com/stretchr/testify/assert"
)

func TestCreateConfig(t *testing.T) {
mustMarshal := func(v any) []byte {
bz, _ := json.Marshal(v)
return bz
}
t.Run("simple", func(t *testing.T) {
c := Config{
BaseURL: TestConfig.BaseURL,
AppNodeURL: TestConfig.AppNodeURL,
Timeout: TestConfig.Timeout,
GasPrices: 42,
Backoff: uretry.NewBackoffConfig(uretry.WithGrowthFactor(1.65)),
RetryAttempts: 10,
RetryDelay: 10 * time.Second,
}
bz := mustMarshal(c)
gotC, err := createConfig(bz)
require.NoError(t, err)
assert.Equal(t, c.Backoff, gotC.Backoff)
assert.Equal(t, c.RetryAttempts, gotC.RetryAttempts)
assert.Equal(t, c.RetryDelay, gotC.RetryDelay)
})
t.Run("no backoff", func(t *testing.T) {
c := Config{
BaseURL: TestConfig.BaseURL,
AppNodeURL: TestConfig.AppNodeURL,
Timeout: TestConfig.Timeout,
GasPrices: 42,
}
bz := mustMarshal(c)
gotC, err := createConfig(bz)
require.NoError(t, err)
assert.Equal(t, defaultSubmitBackoff, gotC.Backoff)
})
t.Run("generate example", func(t *testing.T) {
c := Config{
BaseURL: TestConfig.BaseURL,
AppNodeURL: TestConfig.AppNodeURL,
Timeout: TestConfig.Timeout,
GasPrices: 0.1,
AuthToken: "TOKEN",
Backoff: defaultSubmitBackoff,
RetryAttempts: 4,
RetryDelay: 3 * time.Second,
}
bz := mustMarshal(c)
t.Log(string(bz))
})
}
4 changes: 1 addition & 3 deletions da/celestia/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func exampleNMT(nidSize int, ignoreMaxNamespace bool, leavesNIDs ...byte) *nmt.N
func TestSubmitBatch(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
configBytes, err := json.Marshal(celestia.CelestiaDefaultConfig)
configBytes, err := json.Marshal(celestia.TestConfig)
require.NoError(err)
batch := &types.Batch{
StartHeight: 0,
Expand Down Expand Up @@ -103,7 +103,6 @@ func TestSubmitBatch(t *testing.T) {
},
}
for _, tc := range cases {

t.Log("Case name ", tc.name)
// Create mock clients
mockRPCClient := mocks.NewMockCelestiaRPCClient(t)
Expand All @@ -112,7 +111,6 @@ func TestSubmitBatch(t *testing.T) {
celestia.WithSubmitBackoff(uretry.NewBackoffConfig(uretry.WithInitialDelay(10*time.Millisecond), uretry.WithMaxDelay(10*time.Millisecond))),
celestia.WithRPCClient(mockRPCClient),
celestia.WithRPCAttempts(1),
celestia.WithRPCRetryDelay(10 * time.Millisecond),
}
// Subscribe to the health status event
pubsubServer := pubsub.NewServer()
Expand Down
2 changes: 1 addition & 1 deletion da/da_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func doTestRetrieve(t *testing.T, dalc da.DataAvailabilityLayerClient) {
config := celestia.Config{
BaseURL: "http://localhost:26658",
Timeout: 30 * time.Second,
GasPrices: celestia.CelestiaDefaultConfig.GasPrices,
GasPrices: celestia.DefaultGasPrices,
}
err := config.InitNamespaceID()
require.NoError(err)
Expand Down
6 changes: 3 additions & 3 deletions utils/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ const (

// BackoffConfig is a configuration for a backoff, it's used to create new instances
type BackoffConfig struct {
InitialDelay time.Duration
MaxDelay time.Duration
GrowthFactor float64
InitialDelay time.Duration `json:"initial_delay"`
MaxDelay time.Duration `json:"max_delay"`
GrowthFactor float64 `json:"growth_factor"`
}

// Backoff creates a new Backoff instance with the configuration (starting at 0 attempts made so far)
Expand Down

0 comments on commit 0cd09f3

Please sign in to comment.