Skip to content

Commit

Permalink
feat(da): adjustments after manual testing
Browse files Browse the repository at this point in the history
  • Loading branch information
keruch committed Jul 3, 2024
1 parent ec5ff74 commit 0770e35
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 30 deletions.
16 changes: 14 additions & 2 deletions da/interchain/chain_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"

"github.com/cosmos/cosmos-sdk/client/flags"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/tx"
"github.com/dymensionxyz/cosmosclient/cosmosclient"
"github.com/ignite/cli/ignite/pkg/cosmosaccount"
"github.com/tendermint/tendermint/libs/bytes"
Expand All @@ -17,6 +19,7 @@ import (
type daClient struct {
cosmosclient.Client
queryClient interchainda.QueryClient
txService tx.ServiceClient
}

func newDAClient(ctx context.Context, config DAConfig) (*daClient, error) {
Expand All @@ -27,6 +30,7 @@ func newDAClient(ctx context.Context, config DAConfig) (*daClient, error) {
cosmosclient.WithFees(config.GasFees),
cosmosclient.WithGasLimit(config.GasLimit),
cosmosclient.WithGasPrices(config.GasPrices),
cosmosclient.WithGasAdjustment(config.GasAdjustment),
cosmosclient.WithKeyringBackend(cosmosaccount.KeyringBackend(config.KeyringBackend)),
cosmosclient.WithHome(config.KeyringHomeDir),
}...)
Expand All @@ -36,19 +40,27 @@ func newDAClient(ctx context.Context, config DAConfig) (*daClient, error) {
return &daClient{
Client: c,
queryClient: interchainda.NewQueryClient(c.Context().GRPCClient),
txService: tx.NewServiceClient(c.Context()),
}, nil
}

func (c *daClient) Params(ctx context.Context) (interchainda.Params, error) {
return interchainda.Params{
CostPerByte: sdk.NewInt64Coin(sdk.DefaultBondDenom, 1),
MaxBlobSize: 999999999,
DisputePeriod: 200,
}, nil

// TODO: uncomment when we find a workaround on how to initialize the interchain da query client
resp, err := c.queryClient.Params(ctx, &interchainda.QueryParamsRequest{})
if err != nil {
return interchainda.Params{}, fmt.Errorf("can't query DA layer params: %w", err)
}
return resp.GetParams(), nil
}

func (c *daClient) Tx(ctx context.Context, txHash []byte) (*ctypes.ResultTx, error) {
return c.RPC.Tx(ctx, txHash, false)
func (c *daClient) GetTx(ctx context.Context, txHash string) (*tx.GetTxResponse, error) {
return c.txService.GetTx(ctx, &tx.GetTxRequest{Hash: txHash})
}

func (c *daClient) ABCIQueryWithProof(
Expand Down
35 changes: 21 additions & 14 deletions da/interchain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ type DAConfig struct {
NodeAddress string `mapstructure:"node_address" json:"node_address,omitempty"`
GasLimit uint64 `mapstructure:"gas_limit" json:"gas_limit,omitempty"`
GasPrices string `mapstructure:"gas_prices" json:"gas_prices,omitempty"`
GasAdjustment float64 `mapstructure:"gas_adjustment" json:"gas_adjustment,omitempty"`
GasFees string `mapstructure:"gas_fees" json:"gas_fees,omitempty"`
DAParams interchainda.Params `mapstructure:"da_params" json:"da_params"`

BatchAcceptanceTimeout time.Duration `mapstructure:"batch_acceptance_timeout" json:"batch_acceptance_timeout"`
BatchAcceptanceAttempts uint `mapstructure:"batch_acceptance_attempts" json:"batch_acceptance_attempts"`

RetryMinDelay time.Duration `mapstructure:"retry_min_delay" json:"retry_min_delay,omitempty"`
RetryMaxDelay time.Duration `mapstructure:"retry_min_delay" json:"retry_max_delay,omitempty"`
RetryAttempts uint `mapstructure:"retry_attempts" json:"retry_attempts,omitempty"`
Expand Down Expand Up @@ -91,19 +95,22 @@ func DefaultDAConfig() DAConfig {
home, _ := os.UserHomeDir()
keyringHomeDir := filepath.Join(home, ".simapp")
return DAConfig{
ClientID: "dym-interchain",
ChainID: "interchain-da-test",
KeyringBackend: keyring.BackendTest,
KeyringHomeDir: keyringHomeDir,
AddressPrefix: sdk.Bech32MainPrefix,
AccountName: "sequencer",
NodeAddress: "http://127.0.0.1:36657",
GasLimit: 0,
GasPrices: "10stake",
GasFees: "",
DAParams: interchainda.Params{},
RetryMinDelay: 100 * time.Millisecond,
RetryMaxDelay: 2 * time.Second,
RetryAttempts: 10,
ClientID: "dym-interchain",
ChainID: "my-test-chain",
KeyringBackend: keyring.BackendTest,
KeyringHomeDir: keyringHomeDir,
AddressPrefix: sdk.Bech32MainPrefix,
AccountName: "sequencer",
NodeAddress: "http://127.0.0.1:26657",
GasLimit: 0,
GasPrices: "10stake",
GasAdjustment: 1.1,
GasFees: "",
DAParams: interchainda.Params{},
BatchAcceptanceTimeout: 5 * time.Second,
BatchAcceptanceAttempts: 10,
RetryMinDelay: 100 * time.Millisecond,
RetryMaxDelay: 2 * time.Second,
RetryAttempts: 10,
}
}
24 changes: 16 additions & 8 deletions da/interchain/interchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
cdctypes "github.com/cosmos/cosmos-sdk/codec/types"
cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/tx"
"github.com/dymensionxyz/cosmosclient/cosmosclient"
"github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/libs/pubsub"
Expand All @@ -28,9 +29,9 @@ var (

type DAClient interface {
Context() sdkclient.Context
BroadcastTx(accountName string, msgs ...sdk.Msg) (cosmosclient.Response, error)
Params(ctx context.Context) (interchainda.Params, error)
Tx(ctx context.Context, txHash []byte) (*ctypes.ResultTx, error)
BroadcastTx(string, ...sdk.Msg) (cosmosclient.Response, error)
Params(context.Context) (interchainda.Params, error)
GetTx(context.Context, string) (*tx.GetTxResponse, error)
ABCIQueryWithProof(ctx context.Context, path string, data bytes.HexBytes, height int64) (*ctypes.ResultABCIQuery, error)
}

Expand All @@ -42,8 +43,9 @@ type DALayerClient struct {
cdc codec.Codec
synced chan struct{}

daClient DAClient
daConfig DAConfig
accountAddress string // address of the sequencer in the DA layer
daClient DAClient
daConfig DAConfig
}

// Init is called once. It reads the DA client configuration and initializes resources for the interchain DA provider.
Expand Down Expand Up @@ -75,21 +77,27 @@ func (c *DALayerClient) Init(rawConfig []byte, _ *pubsub.Server, _ store.KV, log
}
config.DAParams = daParams

// Create cancellable context
ctx, cancel := context.WithCancel(ctx)

// Create codec
interfaceRegistry := cdctypes.NewInterfaceRegistry()
cryptocodec.RegisterInterfaces(interfaceRegistry)
interfaceRegistry.RegisterImplementations(&interchainda.MsgSubmitBlob{})
cdc := codec.NewProtoCodec(interfaceRegistry)

addr, err := client.Address(config.AccountName)
if err != nil {
return fmt.Errorf("cannot get '%s' account address from the provided keyring: %w", config.AccountName, err)
}

// Create cancellable context
ctx, cancel := context.WithCancel(ctx)

// Fill client fields
c.logger = logger
c.ctx = ctx
c.cancel = cancel
c.cdc = cdc
c.synced = make(chan struct{})
c.accountAddress = addr.String()
c.daClient = client
c.daConfig = config

Expand Down
34 changes: 34 additions & 0 deletions da/interchain/interchain_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package interchain_test

import (
"encoding/json"
"os"
"testing"

"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"

"github.com/dymensionxyz/dymint/da/interchain"
"github.com/dymensionxyz/dymint/types"
)

// TODO: add interchain DA chain mock
func TestDALayerClient_Init(t *testing.T) {
client := new(interchain.DALayerClient)
config := interchain.DefaultDAConfig()
rawConfig, err := json.Marshal(config)
require.NoError(t, err)
logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout))

err = client.Init(rawConfig, nil, nil, logger)
require.NoError(t, err)

result := client.SubmitBatchV2(&types.Batch{
StartHeight: 1,
EndHeight: 3,
Blocks: []*types.Block{{Header: types.Header{Height: 1}}},
Commits: []*types.Commit{{Height: 1}},
})
require.NoError(t, result.Error)
t.Logf("result: %#v", result)
}
66 changes: 61 additions & 5 deletions da/interchain/submit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package interchain

import (
"fmt"
"time"

"cosmossdk.io/collections"
collcodec "cosmossdk.io/collections/codec"
"github.com/avast/retry-go/v4"
cdctypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/tx"
"github.com/dymensionxyz/cosmosclient/cosmosclient"

"github.com/dymensionxyz/dymint/da"
Expand Down Expand Up @@ -83,7 +85,7 @@ func (c *DALayerClient) submitBatch(batch *types.Batch) (*interchainda.Commitmen

// Prepare the message to be sent to the DA layer
msg := interchainda.MsgSubmitBlob{
Creator: c.daConfig.AccountName,
Creator: c.accountAddress,
Blob: gzipped,
Fees: feesToPay,
}
Expand All @@ -98,8 +100,18 @@ func (c *DALayerClient) submitBatch(batch *types.Batch) (*interchainda.Commitmen
return nil, fmt.Errorf("can't broadcast MsgSubmitBlob to DA layer: %w", err)
}

// Decode the response
// Wait until the tx in included into the DA layer
rawResp, err := c.waitResponse(txResp.TxHash)
if err != nil {
return nil, fmt.Errorf("can't check acceptance of the blob to DA layer: %w", err)
}
if rawResp.TxResponse.Code != 0 {
return nil, fmt.Errorf("MsgSubmitBlob is not executed in DA layer (code %d): %s", rawResp.TxResponse.Code, rawResp.TxResponse.RawLog)
}

// cosmosclient.Response has convenient Decode method, so we reuse txResp to reuse it
var resp interchainda.MsgSubmitBlobResponse
txResp.TxResponse = rawResp.TxResponse
err = txResp.Decode(&resp)
if err != nil {
return nil, fmt.Errorf("can't decode MsgSubmitBlob response: %w", err)
Expand All @@ -114,11 +126,18 @@ func (c *DALayerClient) submitBatch(batch *types.Batch) (*interchainda.Commitmen
if err != nil {
return nil, fmt.Errorf("can't encode DA lakey store key: %w", err)
}
const keyPath = "/key"
abciResp, err := c.daClient.ABCIQueryWithProof(c.ctx, keyPath, key, txResp.Height)
abciPath := fmt.Sprintf("/store/%s/key", interchainda.StoreKey)
abciResp, err := c.daClient.ABCIQueryWithProof(c.ctx, abciPath, key, txResp.Height)
if err != nil {
return nil, fmt.Errorf("can't call ABCI query with proof for the BlobID %d: %w", resp.BlobId, err)
}
if abciResp.Response.IsErr() {
return nil, fmt.Errorf("can't call ABCI query with proof for blob ID %d (code %d): %s",
resp.BlobId, abciResp.Response.Code, abciResp.Response.Log)
}
if abciResp.Response.Value == nil {
return nil, fmt.Errorf("ABCI query with proof for blob ID %d returned nil value", resp.BlobId)
}

return &interchainda.Commitment{
ClientId: c.daConfig.ClientID,
Expand All @@ -135,7 +154,7 @@ func (c *DALayerClient) broadcastTx(msgs ...sdk.Msg) (cosmosclient.Response, err
return cosmosclient.Response{}, fmt.Errorf("can't broadcast MsgSubmitBlob to the DA layer: %w", err)
}
if txResp.Code != 0 {
return cosmosclient.Response{}, fmt.Errorf("MsgSubmitBlob broadcast tx status code is not 0: code %d", txResp.Code)
return cosmosclient.Response{}, fmt.Errorf("MsgSubmitBlob broadcast tx status code is not 0 (code %d): %s", txResp.Code, txResp.RawLog)
}
return txResp, nil
}
Expand All @@ -152,3 +171,40 @@ func (c *DALayerClient) runWithRetry(operation func() error) error {
retry.DelayType(retry.BackOffDelay),
)
}

func (c *DALayerClient) waitResponse(txHash string) (*tx.GetTxResponse, error) {
timer := time.NewTicker(c.daConfig.BatchAcceptanceTimeout)
defer timer.Stop()

var txResp *tx.GetTxResponse
attempt := uint(0)

// First try then wait for the BatchAcceptanceTimeout
for {
err := c.runWithRetry(func() error {
var errX error
txResp, errX = c.daClient.GetTx(c.ctx, txHash)
return errX
})
if err == nil {
return txResp, nil
}

c.logger.Error("Can't check batch acceptance",
"attempt", attempt, "max_attempts", c.daConfig.BatchAcceptanceAttempts, "error", err)

attempt++
if attempt > c.daConfig.BatchAcceptanceAttempts {
return nil, fmt.Errorf("can't check batch acceptance after all attempts")
}

// Wait for the timeout
select {
case <-c.ctx.Done():
return nil, c.ctx.Err()

case <-timer.C:
continue
}
}
}
3 changes: 2 additions & 1 deletion da/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/dymensionxyz/dymint/da/avail"
"github.com/dymensionxyz/dymint/da/celestia"
"github.com/dymensionxyz/dymint/da/grpc"
"github.com/dymensionxyz/dymint/da/interchain"
"github.com/dymensionxyz/dymint/da/local"
)

Expand All @@ -14,7 +15,7 @@ var clients = map[string]func() da.DataAvailabilityLayerClient{
"grpc": func() da.DataAvailabilityLayerClient { return &grpc.DataAvailabilityLayerClient{} },
"celestia": func() da.DataAvailabilityLayerClient { return &celestia.DataAvailabilityLayerClient{} },
"avail": func() da.DataAvailabilityLayerClient { return &avail.DataAvailabilityLayerClient{} },
"interchain": func() da.DataAvailabilityLayerClient { return &avail.DataAvailabilityLayerClient{} },
"interchain": func() da.DataAvailabilityLayerClient { return &interchain.DALayerClient{} },
}

// GetClient returns client identified by name.
Expand Down

0 comments on commit 0770e35

Please sign in to comment.