Skip to content

Commit

Permalink
feat!: using celestia RPC instead of Rest API (#544)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtsitrin authored Jan 1, 2024
1 parent c68e540 commit 379b93e
Show file tree
Hide file tree
Showing 21 changed files with 872 additions and 748 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: '1.19'
go-version: '1.20.5'
- run: git config --global url.https://[email protected]/.insteadOf https://github.com/
- name: golangci-lint
uses: golangci/[email protected]
with:
# Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version
version: v1.49
version: v1.52.0

# Optional: working directory, useful for monorepos
# working-directory: somedir
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.19
go-version: 1.20.5
- run: git config --global url.https://[email protected]/.insteadOf https://github.com/

- name: Build
Expand Down
2 changes: 1 addition & 1 deletion config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func DefaultConfig(home, chainId string) *NodeConfig {
BlockTime: 200 * time.Millisecond,
EmptyBlocksMaxTime: 3 * time.Second,
BatchSubmitMaxTime: 30 * time.Second,
NamespaceID: "000000000000ffff",
NamespaceID: "0000000000000000ffff",
BlockBatchSize: 500,
BlockBatchMaxSizeBytes: 500000,
GossipedBlocksCacheSize: 50},
Expand Down
2 changes: 1 addition & 1 deletion config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ block_batch_max_size_bytes = {{ .BlockManagerConfig.BlockBatchMaxSizeBytes }}
gossiped_blocks_cache_size = {{ .BlockManagerConfig.GossipedBlocksCacheSize }}
#celestia config example:
# da_config = "{\"base_url\": \"http://127.0.0.1:26659\", \"timeout\": 60000000000, \"gas_prices\":0.1, \"gas_adjustment\": 1.3, \"namespace_id\":\"000000000000ffff\"}"
# da_config = "{\"base_url\": \"http://127.0.0.1:26658\", \"timeout\": 60000000000, \"gas_prices\":0.1, \"gas_adjustment\": 1.3, \"token\":\"TOKEN\"}"
# Avail config example:
# da_config = "{\"seed\": \"MNEMONIC\", \"api_url\": \"wss://kate.avail.tools/ws\", \"app_id\": 0, \"tip\":10}"
Expand Down
202 changes: 84 additions & 118 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,32 @@ package celestia

import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"time"

"github.com/avast/retry-go/v4"
"cosmossdk.io/math"
"github.com/gogo/protobuf/proto"
"github.com/tendermint/tendermint/libs/pubsub"
rpcclient "github.com/tendermint/tendermint/rpc/client"
httprpcclient "github.com/tendermint/tendermint/rpc/client/http"

cnc "github.com/celestiaorg/go-cnc"
openrpc "github.com/rollkit/celestia-openrpc"

"github.com/rollkit/celestia-openrpc/types/blob"
"github.com/rollkit/celestia-openrpc/types/share"

"github.com/dymensionxyz/dymint/da"
celtypes "github.com/dymensionxyz/dymint/da/celestia/types"
"github.com/dymensionxyz/dymint/log"
"github.com/dymensionxyz/dymint/store"
"github.com/dymensionxyz/dymint/types"
pb "github.com/dymensionxyz/dymint/types/pb/dymint"
)

type CNCClientI interface {
SubmitPFB(ctx context.Context, namespaceID cnc.Namespace, blob []byte, fee int64, gasLimit uint64) (*cnc.TxResponse, error)
NamespacedShares(ctx context.Context, namespaceID cnc.Namespace, height uint64) ([][]byte, error)
NamespacedData(ctx context.Context, namespaceID cnc.Namespace, height uint64) ([][]byte, error)
}

// DataAvailabilityLayerClient use celestia-node public API.
type DataAvailabilityLayerClient struct {
client CNCClientI
rpc celtypes.CelestiaRPCClient

pubsubServer *pubsub.Server
RPCClient rpcclient.Client
config Config
logger log.Logger
ctx context.Context
Expand All @@ -44,17 +40,10 @@ type DataAvailabilityLayerClient struct {
var _ da.DataAvailabilityLayerClient = &DataAvailabilityLayerClient{}
var _ da.BatchRetriever = &DataAvailabilityLayerClient{}

// WithCNCClient sets CNC client.
func WithCNCClient(client CNCClientI) da.Option {
return func(daLayerClient da.DataAvailabilityLayerClient) {
daLayerClient.(*DataAvailabilityLayerClient).client = client
}
}

// WithRPCClient sets rpc client.
func WithRPCClient(rpcClient rpcclient.Client) da.Option {
func WithRPCClient(rpc celtypes.CelestiaRPCClient) da.Option {
return func(daLayerClient da.DataAvailabilityLayerClient) {
daLayerClient.(*DataAvailabilityLayerClient).RPCClient = rpcClient
daLayerClient.(*DataAvailabilityLayerClient).rpc = rpc
}
}

Expand Down Expand Up @@ -112,28 +101,64 @@ func (c *DataAvailabilityLayerClient) Init(config []byte, pubsubServer *pubsub.S
c.txPollingRetryDelay = defaultTxPollingRetryDelay
c.txPollingAttempts = defaultTxPollingAttempts
c.submitRetryDelay = defaultSubmitRetryDelay
c.RPCClient, err = httprpcclient.New(c.config.AppNodeURL, "")
if err != nil {
return err
}
c.client, err = cnc.NewClient(c.config.BaseURL, cnc.WithTimeout(c.config.Timeout))
if err != nil {
return err
}

c.ctx, c.cancel = context.WithCancel(context.Background())

// Apply options
for _, apply := range options {
apply(c)
}

c.ctx, c.cancel = context.WithCancel(context.Background())

return nil
}

// Start prepares DataAvailabilityLayerClient to work.
func (c *DataAvailabilityLayerClient) Start() error {
func (c *DataAvailabilityLayerClient) Start() (err error) {
c.logger.Info("starting Celestia Data Availability Layer Client")

// other client has already been set
if c.rpc != nil {
c.logger.Debug("celestia-node client already set")
return nil
}

rpc, err := openrpc.NewClient(c.ctx, c.config.BaseURL, c.config.AuthToken)
if err != nil {
return err
}

state, err := rpc.Header.SyncState(c.ctx)
if err != nil {
return err
}

if !state.Finished() {
c.logger.Info("waiting for celestia-node to finish syncing", "height", state.Height, "target", state.ToHeight)

done := make(chan error, 1)
go func() {
done <- rpc.Header.SyncWait(c.ctx)
}()

ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()

for {
select {
case err := <-done:
if err != nil {
return err
}
return nil
case <-ticker.C:
c.logger.Info("celestia-node still syncing", "height", state.Height, "target", state.ToHeight)
}
}
}

c.logger.Info("celestia-node is synced", "height", state.ToHeight)

c.rpc = NewOpenRPC(rpc)
return nil
}

Expand All @@ -155,7 +180,7 @@ func (c *DataAvailabilityLayerClient) GetClientType() da.Client {

// SubmitBatch submits a batch to the DA layer.
func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultSubmitBatch {
blob, err := batch.MarshalBinary()
data, err := batch.MarshalBinary()
if err != nil {
return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Expand All @@ -164,20 +189,31 @@ func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS
},
}
}
estimatedGas := DefaultEstimateGas(uint32(len(blob)))

blockBlob, err := blob.NewBlobV0(c.config.NamespaceID.Bytes(), data)
if err != nil {
return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
},
}
}
blobs := []*blob.Blob{blockBlob}

estimatedGas := DefaultEstimateGas(uint32(len(data)))
gasWanted := uint64(float64(estimatedGas) * c.config.GasAdjustment)
fees := c.calculateFees(gasWanted)
c.logger.Debug("Submitting to da blob with size", "size", len(blob), "estimatedGas", estimatedGas, "gasAdjusted", gasWanted, "fees", fees)
c.logger.Debug("Submitting to da blob with size", "size", len(blockBlob.Data), "estimatedGas", estimatedGas, "gasAdjusted", gasWanted, "fees", fees)

for {
select {
case <-c.ctx.Done():
c.logger.Debug("Context cancelled")
return da.ResultSubmitBatch{}
default:
//SubmitPFB sets an error if the txResponse has error, so we check check the txResponse for error
txResponse, err := c.client.SubmitPFB(c.ctx, c.config.NamespaceID, blob, fees, gasWanted)
if txResponse == nil {
txResponse, err := c.rpc.SubmitPayForBlob(c.ctx, math.NewInt(fees), gasWanted, blobs)
if err != nil {
c.logger.Error("Failed to submit DA batch. Emitting health event and trying again", "error", err)
res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, false, err)
if err != nil {
Expand All @@ -187,34 +223,18 @@ func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS
continue
}

if txResponse.Code != 0 {
c.logger.Error("Failed to submit DA batch. Emitting health event and trying again", "txResponse", txResponse.RawLog, "code", txResponse.Code)
res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, false, errors.New(txResponse.RawLog))
//double check txResponse is not nil - not supposed to happen
if txResponse == nil {
err := errors.New("txResponse is nil")
c.logger.Error("Failed to submit DA batch", "error", err)
res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, false, err)
if err != nil {
return res
}
time.Sleep(c.submitRetryDelay)
continue
}

// Here we assume that if txResponse is not nil and also error is not nil it means that the transaction
// was submitted (not necessarily accepted) and we still didn't get a clear status regarding it (e.g timeout).
// hence trying to poll for it.
daHeight := uint64(txResponse.Height)
if daHeight == 0 {
c.logger.Debug("Failed to receive DA batch inclusion result. Waiting for inclusion", "txHash", txResponse.TxHash)
daHeight, err = c.waitForTXInclusion(txResponse.TxHash)
if err != nil {
c.logger.Error("Failed to receive DA batch inclusion result. Emitting health event and trying again", "error", err)
res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, false, err)
if err != nil {
return res
}
time.Sleep(c.submitRetryDelay)
continue
}
}

c.logger.Info("Successfully submitted DA batch", "txHash", txResponse.TxHash, "daHeight", txResponse.Height, "gasWanted", txResponse.GasWanted, "gasUsed", txResponse.GasUsed)
res, err := da.SubmitBatchHealthEventHelper(c.pubsubServer, c.ctx, true, nil)
if err != nil {
Expand All @@ -224,37 +244,16 @@ func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
Message: "tx hash: " + txResponse.TxHash,
DAHeight: daHeight,
DAHeight: uint64(txResponse.Height),
},
}
}
}
}

// CheckBatchAvailability queries DA layer to check data availability of block at given height.
func (c *DataAvailabilityLayerClient) CheckBatchAvailability(dataLayerHeight uint64) da.ResultCheckBatch {
shares, err := c.client.NamespacedShares(c.ctx, c.config.NamespaceID, dataLayerHeight)
if err != nil {
return da.ResultCheckBatch{
BaseResult: da.BaseResult{
Code: da.StatusError,
Message: err.Error(),
},
}
}

return da.ResultCheckBatch{
BaseResult: da.BaseResult{
Code: da.StatusSuccess,
DAHeight: dataLayerHeight,
},
DataAvailable: len(shares) > 0,
}
}

// RetrieveBatches gets a batch of blocks from DA layer.
func (c *DataAvailabilityLayerClient) RetrieveBatches(dataLayerHeight uint64) da.ResultRetrieveBatch {
data, err := c.client.NamespacedData(c.ctx, c.config.NamespaceID, dataLayerHeight)
blobs, err := c.rpc.GetAll(c.ctx, dataLayerHeight, []share.Namespace{c.config.NamespaceID.Bytes()})
if err != nil {
return da.ResultRetrieveBatch{
BaseResult: da.BaseResult{
Expand All @@ -265,9 +264,9 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(dataLayerHeight uint64) da
}

var batches []*types.Batch
for i, msg := range data {
for i, blob := range blobs {
var batch pb.Batch
err = proto.Unmarshal(msg, &batch)
err = proto.Unmarshal(blob.Data, &batch)
if err != nil {
c.logger.Error("failed to unmarshal block", "daHeight", dataLayerHeight, "position", i, "error", err)
continue
Expand All @@ -293,36 +292,3 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(dataLayerHeight uint64) da
Batches: batches,
}
}

// FIXME(omritoptix): currently we're relaying on a node without validating it using a light client.
// should be proxied through light client once it's supported (https://github.com/dymensionxyz/dymint/issues/335).
func (c *DataAvailabilityLayerClient) waitForTXInclusion(txHash string) (uint64, error) {

hashBytes, err := hex.DecodeString(txHash)
if err != nil {
return 0, err
}

inclusionHeight := uint64(0)

err = retry.Do(func() error {
result, err := c.RPCClient.Tx(c.ctx, hashBytes, false)
if err != nil {
return err
}

if result == nil || err != nil {
c.logger.Error("couldn't get transaction from node", "err", err)
return errors.New("transaction not found")
}

inclusionHeight = uint64(result.Height)

return nil
}, retry.Attempts(uint(c.txPollingAttempts)), retry.DelayType(retry.FixedDelay), retry.Delay(c.txPollingRetryDelay))

if err != nil {
return 0, err
}
return inclusionHeight, nil
}
Loading

0 comments on commit 379b93e

Please sign in to comment.