From bb07915bc41ddeebf1df2b3edfd1ab48464ffbfc Mon Sep 17 00:00:00 2001 From: keruch Date: Thu, 27 Jun 2024 21:03:10 +0200 Subject: [PATCH] feat(da): SubmitBatch method --- da/interchain/chain_client.go | 33 ++++++-- da/interchain/config.go | 30 +++---- da/interchain/interchain.go | 153 +++++++++++++++++++--------------- types/pb/interchain_da/da.go | 13 +++ 4 files changed, 135 insertions(+), 94 deletions(-) create mode 100644 types/pb/interchain_da/da.go diff --git a/da/interchain/chain_client.go b/da/interchain/chain_client.go index e63e743bc..a2814342c 100644 --- a/da/interchain/chain_client.go +++ b/da/interchain/chain_client.go @@ -1,20 +1,23 @@ package interchain import ( - sdkclient "github.com/cosmos/cosmos-sdk/client" + "context" + "fmt" + "github.com/cosmos/cosmos-sdk/client/flags" - sdktypes "github.com/cosmos/cosmos-sdk/types" "github.com/dymensionxyz/cosmosclient/cosmosclient" "github.com/ignite/cli/ignite/pkg/cosmosaccount" + + interchainda "github.com/dymensionxyz/dymint/types/pb/interchain_da" ) -type DAClient interface { - Context() sdkclient.Context - BroadcastTx(accountName string, msgs ...sdktypes.Msg) (cosmosclient.Response, error) +type daClient struct { + cosmosclient.Client + queryClient interchainda.QueryClient } -func getCosmosClientOptions(config DAConfig) []cosmosclient.Option { - options := []cosmosclient.Option{ +func newDAClient(ctx context.Context, config DAConfig) (*daClient, error) { + c, err := cosmosclient.New(ctx, []cosmosclient.Option{ cosmosclient.WithAddressPrefix(config.AddressPrefix), cosmosclient.WithBroadcastMode(flags.BroadcastSync), cosmosclient.WithNodeAddress(config.NodeAddress), @@ -23,6 +26,20 @@ func getCosmosClientOptions(config DAConfig) []cosmosclient.Option { cosmosclient.WithGasPrices(config.GasPrices), cosmosclient.WithKeyringBackend(cosmosaccount.KeyringBackend(config.KeyringBackend)), cosmosclient.WithHome(config.KeyringHomeDir), + }...) + if err != nil { + return nil, fmt.Errorf("can't create DA layer cosmos client: %w", err) + } + return &daClient{ + Client: c, + queryClient: interchainda.NewQueryClient(c.Context().GRPCClient), + }, nil +} + +func (c *daClient) Params(ctx context.Context) (interchainda.Params, error) { + resp, err := c.queryClient.Params(ctx, &interchainda.QueryParamsRequest{}) + if err != nil { + return interchainda.Params{}, fmt.Errorf("can't query DA layer params: %w", err) } - return options + return resp.GetParams(), nil } diff --git a/da/interchain/config.go b/da/interchain/config.go index e2a27f2e0..cf8f982c6 100644 --- a/da/interchain/config.go +++ b/da/interchain/config.go @@ -1,25 +1,19 @@ package interchain import ( - "github.com/cosmos/cosmos-sdk/client" - "github.com/cosmos/cosmos-sdk/codec" + interchainda "github.com/dymensionxyz/dymint/types/pb/interchain_da" ) type DAConfig struct { - ChainID string `mapstructure:"chain_id"` // The chain ID of the DA chain - ClientID string `mapstructure:"client_id"` // This is the IBC client ID on Dymension hub for the DA chain - KeyringBackend string `mapstructure:"keyring_backend"` - KeyringHomeDir string `mapstructure:"keyring_home_dir"` - AddressPrefix string `mapstructure:"da_address_prefix"` - AccountName string `mapstructure:"da_account_name"` - NodeAddress string `mapstructure:"da_node_address"` - GasLimit uint64 `mapstructure:"da_gas_limit"` - GasPrices string `mapstructure:"da_gas_prices"` - GasFees string `mapstructure:"da_gas_fees"` - ChainParams interchaindatypes.Params `mapstructure:"chain_params"` // The params of the DA chain -} - -type EncodingConfig interface { - TxConfig() client.TxConfig - Codec() codec.Codec + ClientID string `mapstructure:"client_id"` // This is the IBC client ID on Dymension hub for the DA chain + ChainID string `mapstructure:"chain_id"` // The chain ID of the DA chain + KeyringBackend string `mapstructure:"keyring_backend"` + KeyringHomeDir string `mapstructure:"keyring_home_dir"` + AddressPrefix string `mapstructure:"address_prefix"` + AccountName string `mapstructure:"account_name"` + NodeAddress string `mapstructure:"node_address"` + GasLimit uint64 `mapstructure:"gas_limit"` + GasPrices string `mapstructure:"gas_prices"` + GasFees string `mapstructure:"gas_fees"` + DAParams interchainda.Params `mapstructure:"da_params"` } diff --git a/da/interchain/interchain.go b/da/interchain/interchain.go index ebc6a7521..b56e3b814 100644 --- a/da/interchain/interchain.go +++ b/da/interchain/interchain.go @@ -5,11 +5,11 @@ import ( "encoding/json" "fmt" + sdkclient "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/codec" cdctypes "github.com/cosmos/cosmos-sdk/codec/types" cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec" sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/cosmos/cosmos-sdk/types/tx" "github.com/dymensionxyz/cosmosclient/cosmosclient" "github.com/tendermint/tendermint/libs/pubsub" @@ -17,6 +17,7 @@ import ( "github.com/dymensionxyz/dymint/settlement/dymension" "github.com/dymensionxyz/dymint/store" "github.com/dymensionxyz/dymint/types" + interchainda "github.com/dymensionxyz/dymint/types/pb/interchain_da" ) var ( @@ -24,6 +25,12 @@ var ( _ da.BatchRetriever = &DataAvailabilityLayerClient{} ) +type DAClient interface { + Context() sdkclient.Context + BroadcastTx(accountName string, msgs ...sdk.Msg) (cosmosclient.Response, error) + Params(ctx context.Context) (interchainda.Params, error) +} + // DataAvailabilityLayerClient is a client for DA-provider blockchains supporting the interchain-da module. type DataAvailabilityLayerClient struct { logger types.Logger @@ -32,42 +39,56 @@ type DataAvailabilityLayerClient struct { cdc codec.Codec synced chan struct{} - pubsubServer *pubsub.Server + pubsubServer *pubsub.Server - daClient DAClient - daConfig DAConfig - encodingConfig EncodingConfig // The DA chain's encoding config + daClient DAClient + daConfig DAConfig } // Init is called once. It reads the DA client configuration and initializes resources for the interchain DA provider. func (c *DataAvailabilityLayerClient) Init(rawConfig []byte, server *pubsub.Server, _ store.KV, logger types.Logger, options ...da.Option) error { + ctx := context.Background() + + // Read DA layer config var config DAConfig err := json.Unmarshal(rawConfig, &config) if err != nil { return fmt.Errorf("invalid config: %w", err) } + // Create cosmos client with DA layer + client, err := newDAClient(ctx, config) + if err != nil { + return fmt.Errorf("can't create DA layer client: %w", err) + } + + // Query DA layer interchain-da module params + daParams, err := client.Params(ctx) + if err != nil { + return fmt.Errorf("can't query DA layer interchain-da module params: %w", err) + } + config.DAParams = daParams + + // Create cancellable context + ctx, cancel := context.WithCancel(ctx) + + // Create codec interfaceRegistry := cdctypes.NewInterfaceRegistry() cryptocodec.RegisterInterfaces(interfaceRegistry) - interchaindatypes.RegisterInterfaces(interfaceRegistry) + interfaceRegistry.RegisterImplementations(&interchainda.MsgSubmitBlob{}) cdc := codec.NewProtoCodec(interfaceRegistry) - ctx, cancel := context.WithCancel(context.Background()) - + // Fill client fields c.logger = logger c.ctx = ctx c.cancel = cancel c.cdc = cdc c.synced = make(chan struct{}) c.pubsubServer = server - c.daConfig = config - - client, err := cosmosclient.New(ctx, getCosmosClientOptions(config)..., ) - if err != nil { - return fmt.Errorf("can't create DA layer client: %w", err) - } c.daClient = client + c.daConfig = config + // Apply client options for _, apply := range options { apply(c) } @@ -79,9 +100,6 @@ func (c *DataAvailabilityLayerClient) Init(rawConfig []byte, server *pubsub.Serv // It fetches the latest interchain module parameters and sets up a subscription to receive updates when the provider updates these parameters. // This ensures that the client is always up-to-date. func (c *DataAvailabilityLayerClient) Start() error { - // Get the module parameters from the chain - c.daConfig.ChainParams = c.grpc.GetModuleParams() - // Get the connectionID from the dymension hub for the da chain c.daConfig.ClientID = dymension.(c.chainConfig.ChainID) @@ -97,9 +115,9 @@ func (c *DataAvailabilityLayerClient) Start() error { // Stop is called once, when DataAvailabilityLayerClient is no longer needed. func (c *DataAvailabilityLayerClient) Stop() error { - c.daClient.Close() c.pubsubServer.Stop() c.cancel() + return nil } // Synced returns channel for on sync event @@ -112,81 +130,80 @@ func (c *DataAvailabilityLayerClient) GetClientType() da.Client { } func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultSubmitBatch { - blob, err := batch.MarshalBinary() + result, err := c.submitBatch(batch) if err != nil { return da.ResultSubmitBatch{ - BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error(), Error: err}, + BaseResult: da.BaseResult{ + Code: da.StatusError, + Message: err.Error(), + Error: err, + }, + SubmitMetaData: nil, } } - - if len(blob) > int(c.daConfig.ChainParams.MaxBlobSize) { - return da.ResultSubmitBatch{ - BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error(), Error: err}, - } + return da.ResultSubmitBatch{ + BaseResult: da.BaseResult{ + Code: da.StatusSuccess, + Message: "Submission successful", + }, + SubmitMetaData: &da.DASubmitMetaData{ + Height: height, + Namespace: c.config.NamespaceID.Bytes(), + Client: da.Celestia, + Commitment: commitment, + Index: 0, + Length: 0, + Root: nil, + }, } +} - // submit the blob to da chain +type submitBatchResult struct { + BlobID uint64 + BlobHash string +} + +func (c *DataAvailabilityLayerClient) submitBatch(batch *types.Batch) (submitBatchResult, error) { + blob, err := batch.MarshalBinary() + if err != nil { + return submitBatchResult{}, fmt.Errorf("can't marshal batch: %w", err) + } - // calculate the da fees to pay - // feesToPay = params.CostPerByte * len(blob) - feesToPay := sdk.NewCoin(c.daConfig.ChainParams.CostPerByte.Denom, c.daConfig.ChainParams.CostPerByte.Amount.MulRaw(int64(len(blob)))) + if len(blob) > int(c.daConfig.DAParams.MaxBlobSize) { + return submitBatchResult{}, fmt.Errorf("blob size %d exceeds the maximum allowed size %d", len(blob), c.daConfig.DAParams.MaxBlobSize) + } - // generate the submit blob msg + feesToPay := sdk.NewCoin(c.daConfig.DAParams.CostPerByte.Denom, c.daConfig.DAParams.CostPerByte.Amount.MulRaw(int64(len(blob)))) - msg := interchaindatypes.MsgSubmitBlob{ - Creator: "", + msg := interchainda.MsgSubmitBlob{ + Creator: c.daConfig.AccountName, Blob: blob, Fees: feesToPay, } - // use msg placeholder for now not to import the interchain-da module directly - msgP := new(sdk.Msg) - msg := *msgP - // wrap the msg into a tx - txBuilder := c.encodingConfig.NewTxBuilder() - err := txBuilder.SetMsgs(msg) + txResp, err := c.daClient.BroadcastTx(c.daConfig.AccountName, &msg) if err != nil { - return da.ResultSubmitBatch{ - BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error(), Error: err}, - } + return submitBatchResult{}, fmt.Errorf("can't broadcast MsgSubmitBlob to the DA layer: %w", err) } - - ctx := context.Background() - // sign and broadcast the tx - txBytes, err := c.encodingConfig.TxEncoder()(txBuilder.GetTx()) - if err != nil { - return da.ResultSubmitBatch{ - BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error(), Error: err}, - } + if txResp.Code != 0 { + return submitBatchResult{}, fmt.Errorf("MsgSubmitBlob broadcast tx status code is not 0: code %d", txResp.Code) } - txHash, err := c.txClient.BroadcastTx(ctx, &tx.BroadcastTxRequest{ - TxBytes: txBytes, - Mode: tx.BroadcastMode_BROADCAST_MODE_SYNC, - }) + var resp interchainda.MsgSubmitBlobResponse + err = txResp.Decode(&resp) if err != nil { - return da.ResultSubmitBatch{ - BaseResult: da.BaseResult{Code: da.StatusError, Message: err.Error(), Error: err}, - } + return submitBatchResult{}, fmt.Errorf("can't decode MsgSubmitBlob response: %w", err) } - c.txClient.GetTx(ctx, &tx.GetTxRequest{ - Hash: txHash, - }) - - // get the tx details - txRes, err = c.grpc.GetTxResponse(txhash) - blobId, blobHash, height = parse(txRes) - // trigger ibc stateupdate - optional (?) // other ibc interactions would trigger this anyway. But until then, inclusion cannot be verified. // better to trigger a stateupdate now imo dymension.tx.ibc.client.updatestate(c.daConfig.clientID) // could import the go relayer and execute their funcs - return ResultSubmitBatch{ - BaseResult: BaseResult("success"), - SubmitMetaData: ("interchain", height, blobId, blobHash, c.daConfig.clientID, ) - } + return submitBatchResult{ + BlobID: resp.BlobId, + BlobHash: resp.BlobHash, + }, nil } func (c *DataAvailabilityLayerClient) CheckBatchAvailability(daMetaData *da.DASubmitMetaData) da.ResultCheckBatch { diff --git a/types/pb/interchain_da/da.go b/types/pb/interchain_da/da.go new file mode 100644 index 000000000..c1bdb3dfc --- /dev/null +++ b/types/pb/interchain_da/da.go @@ -0,0 +1,13 @@ +package interchain_da + +import sdk "github.com/cosmos/cosmos-sdk/types" + +func (m MsgSubmitBlob) ValidateBasic() error { + // Validation is done on the DA layer side + return nil +} + +func (m MsgSubmitBlob) GetSigners() []sdk.AccAddress { + signer, _ := sdk.AccAddressFromBech32(m.Creator) + return []sdk.AccAddress{signer} +}