Skip to content

Commit

Permalink
Add EigenDA client using handshake based authentication (#551)
Browse files Browse the repository at this point in the history
  • Loading branch information
teddyknox authored May 23, 2024
1 parent fa5866c commit 003c358
Show file tree
Hide file tree
Showing 34 changed files with 878 additions and 77 deletions.
51 changes: 51 additions & 0 deletions api/clients/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package clients

import (
"fmt"
"time"
)

type EigenDAClientConfig struct {
// RPC is the HTTP provider URL for the Data Availability node.
RPC string

// The total amount of time that the client will spend waiting for EigenDA to confirm a blob
StatusQueryTimeout time.Duration

// The amount of time to wait between status queries of a newly dispersed blob
StatusQueryRetryInterval time.Duration

// The total amount of time that the client will waiting for a response from the EigenDA disperser
ResponseTimeout time.Duration

// The quorum IDs to write blobs to using this client. Should not include default quorums 0 or 1.
CustomQuorumIDs []uint

// Signer private key in hex encoded format. This key should not be associated with an Ethereum address holding any funds.
SignerPrivateKeyHex string

// Whether to disable TLS for an insecure connection when connecting to a local EigenDA disperser instance.
DisableTLS bool

// The blob encoding version to use when writing blobs from the high level interface.
PutBlobEncodingVersion BlobEncodingVersion
}

func (c *EigenDAClientConfig) CheckAndSetDefaults() error {
if c.StatusQueryRetryInterval == 0 {
c.StatusQueryRetryInterval = 5 * time.Second
}
if c.StatusQueryTimeout == 0 {
c.StatusQueryTimeout = 25 * time.Minute
}
if c.ResponseTimeout == 0 {
c.ResponseTimeout = 30 * time.Second
}
if len(c.SignerPrivateKeyHex) != 64 {
return fmt.Errorf("EigenDAClientConfig.SignerPrivateKeyHex should be 64 hex characters long, should not have 0x prefix")
}
if len(c.RPC) == 0 {
return fmt.Errorf("EigenDAClientConfig.RPC not set")
}
return nil
}
21 changes: 21 additions & 0 deletions clients/disperser_client.go → api/clients/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type DisperserClient interface {
DisperseBlob(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error)
DisperseBlobAuthenticated(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error)
GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error)
RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error)
}

type disperserClient struct {
Expand Down Expand Up @@ -222,3 +223,23 @@ func (c *disperserClient) GetBlobStatus(ctx context.Context, requestID []byte) (

return reply, nil
}

func (c *disperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error) {
addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port)
dialOptions := c.getDialOptions()
conn, err := grpc.Dial(addr, dialOptions...)
if err != nil {
return nil, err
}
disperserClient := disperser_rpc.NewDisperserClient(conn)
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*60)
defer cancel()
reply, err := disperserClient.RetrieveBlob(ctxTimeout, &disperser_rpc.RetrieveBlobRequest{
BatchHeaderHash: batchHeaderHash,
BlobIndex: blobIndex,
})
if err != nil {
return nil, err
}
return reply.Data, nil
}
175 changes: 175 additions & 0 deletions api/clients/eigenda_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package clients

import (
"context"
"encoding/base64"
"encoding/hex"
"fmt"
"net"
"time"

grpcdisperser "github.com/Layr-Labs/eigenda/api/grpc/disperser"
"github.com/Layr-Labs/eigenda/core/auth"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/ethereum/go-ethereum/log"
)

type IEigenDAClient interface {
GetBlob(ctx context.Context, BatchHeaderHash []byte, BlobIndex uint32) ([]byte, error)
PutBlob(ctx context.Context, txData []byte) (*grpcdisperser.BlobInfo, error)
}

type EigenDAClient struct {
Config EigenDAClientConfig
Log log.Logger
Client DisperserClient
PutCodec BlobCodec
}

var _ IEigenDAClient = EigenDAClient{}

func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClient, error) {
err := config.CheckAndSetDefaults()
if err != nil {
return nil, err
}

host, port, err := net.SplitHostPort(config.RPC)
if err != nil {
return nil, fmt.Errorf("failed to parse EigenDA RPC: %w", err)
}

signer := auth.NewLocalBlobRequestSigner(config.SignerPrivateKeyHex)
llConfig := NewConfig(host, port, config.ResponseTimeout, !config.DisableTLS)
llClient := NewDisperserClient(llConfig, signer)

codec, err := BlobEncodingVersionToCodec(config.PutBlobEncodingVersion)
if err != nil {
return nil, fmt.Errorf("error initializing EigenDA client: %w", err)
}

return &EigenDAClient{
Log: log,
Config: config,
Client: llClient,
PutCodec: codec,
}, nil
}

func (m EigenDAClient) GetBlob(ctx context.Context, BatchHeaderHash []byte, BlobIndex uint32) ([]byte, error) {
data, err := m.Client.RetrieveBlob(ctx, BatchHeaderHash, BlobIndex)
if err != nil {
return nil, err
}

if len(data) == 0 {
return nil, fmt.Errorf("blob has length zero")
}

version := BlobEncodingVersion(data[0])
codec, err := BlobEncodingVersionToCodec(version)
if err != nil {
return nil, fmt.Errorf("error getting blob: %w", err)
}

rawData, err := codec.DecodeBlob(data)
if err != nil {
return nil, fmt.Errorf("error getting blob: %w", err)
}

return rawData, nil
}

func (m EigenDAClient) PutBlob(ctx context.Context, data []byte) (*grpcdisperser.BlobInfo, error) {
resultChan, errorChan := m.PutBlobAsync(ctx, data)
select { // no timeout here because we depend on the configured timeout in PutBlobAsync
case result := <-resultChan:
return result, nil
case err := <-errorChan:
return nil, err
}
}

func (m EigenDAClient) PutBlobAsync(ctx context.Context, data []byte) (resultChan chan *grpcdisperser.BlobInfo, errChan chan error) {
resultChan = make(chan *grpcdisperser.BlobInfo, 1)
errChan = make(chan error, 1)
go m.putBlob(ctx, data, resultChan, errChan)
return
}

func (m EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan chan *grpcdisperser.BlobInfo, errChan chan error) {
m.Log.Info("Attempting to disperse blob to EigenDA")

// encode blob
if m.PutCodec == nil {
errChan <- fmt.Errorf("PutCodec cannot be nil")
return
}
data := m.PutCodec.EncodeBlob(rawData)

customQuorumNumbers := make([]uint8, len(m.Config.CustomQuorumIDs))
for i, e := range m.Config.CustomQuorumIDs {
customQuorumNumbers[i] = uint8(e)
}

// disperse blob
blobStatus, requestID, err := m.Client.DisperseBlobAuthenticated(ctx, data, customQuorumNumbers)
if err != nil {
errChan <- fmt.Errorf("error initializing DisperseBlobAuthenticated() client: %w", err)
return
}

// process response
if *blobStatus == disperser.Failed {
m.Log.Error("Unable to disperse blob to EigenDA, aborting", "err", err)
errChan <- fmt.Errorf("reply status is %d", blobStatus)
return
}

base64RequestID := base64.StdEncoding.EncodeToString(requestID)
m.Log.Info("Blob dispersed to EigenDA, now waiting for confirmation", "requestID", base64RequestID)

ticker := time.NewTicker(m.Config.StatusQueryRetryInterval)
defer ticker.Stop()

var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, m.Config.StatusQueryTimeout)
defer cancel()

for {
select {
case <-ctx.Done():
errChan <- fmt.Errorf("timed out waiting for EigenDA blob to confirm blob with request id=%s: %w", base64RequestID, ctx.Err())
return
case <-ticker.C:
statusRes, err := m.Client.GetBlobStatus(ctx, requestID)
if err != nil {
m.Log.Error("Unable to retrieve blob dispersal status, will retry", "requestID", base64RequestID, "err", err)
continue
}

switch statusRes.Status {
case grpcdisperser.BlobStatus_PROCESSING, grpcdisperser.BlobStatus_DISPERSING:
m.Log.Info("Blob submitted, waiting for dispersal from EigenDA", "requestID", base64RequestID)
case grpcdisperser.BlobStatus_FAILED:
m.Log.Error("EigenDA blob dispersal failed in processing", "requestID", base64RequestID, "err", err)
errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing, requestID=%s: %w", base64RequestID, err)
return
case grpcdisperser.BlobStatus_INSUFFICIENT_SIGNATURES:
m.Log.Error("EigenDA blob dispersal failed in processing with insufficient signatures", "requestID", base64RequestID, "err", err)
errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing with insufficient signatures, requestID=%s: %w", base64RequestID, err)
return
case grpcdisperser.BlobStatus_CONFIRMED:
m.Log.Info("EigenDA blob confirmed, waiting for finalization", "requestID", base64RequestID)
case grpcdisperser.BlobStatus_FINALIZED:
batchHeaderHashHex := fmt.Sprintf("0x%s", hex.EncodeToString(statusRes.Info.BlobVerificationProof.BatchMetadata.BatchHeaderHash))
m.Log.Info("Successfully dispersed blob to EigenDA", "requestID", base64RequestID, "batchHeaderHash", batchHeaderHashHex)
resultChan <- statusRes.Info
return
default:
errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing with reply status %d", statusRes.Status)
return
}
}
}
}
Loading

0 comments on commit 003c358

Please sign in to comment.