Skip to content

Commit

Permalink
Add EigenDA client using handshake based authentication
Browse files Browse the repository at this point in the history
  • Loading branch information
teddyknox committed May 9, 2024
1 parent 8ec570b commit 190aba6
Show file tree
Hide file tree
Showing 14 changed files with 357 additions and 53 deletions.
8 changes: 8 additions & 0 deletions api/client/cert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package client

type Cert struct {
BatchHeaderHash []byte
BlobIndex uint32
ReferenceBlockNumber uint32
QuorumIDs []uint32
}
244 changes: 244 additions & 0 deletions api/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
package client

import (
"bytes"
"context"
"crypto/tls"
"encoding/base64"
"encoding/binary"
"encoding/hex"
"fmt"
"time"

"github.com/Layr-Labs/eigenda/api/grpc/disperser"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/auth"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/encoding/utils/codec"
"github.com/ethereum/go-ethereum/log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

type BlobEncodingVersion byte

var NoIFFT BlobEncodingVersion = 0x00

type IEigenDAClient interface {
GetBlob(ctx context.Context, BatchHeaderHash []byte, BlobIndex uint32) ([]byte, error)
PutBlob(ctx context.Context, txData []byte) (*Cert, error)
}

type EigenDAClient struct {
Config

Log log.Logger

client disperser.DisperserClient

signer *auth.LocalBlobRequestSigner
}

var _ IEigenDAClient = EigenDAClient{}

func NewEigenDAClient(ctx context.Context, log log.Logger, config Config) (*EigenDAClient, error) {
err := config.Check()
if err != nil {
return nil, err
}
client, err := NewDisperserClient(ctx, config.RPC)
if err != nil {
return nil, fmt.Errorf("failed to instatiated EigenDA GRPC client: %w", err)
}
return &EigenDAClient{
Log: log,
Config: config,
client: client,
signer: auth.NewLocalBlobRequestSigner(config.SignerPrivateKeyHex),
}, nil
}

func NewDisperserClient(ctx context.Context, rpc string) (disperser.DisperserClient, error) {
config := &tls.Config{}
credential := credentials.NewTLS(config)
dialOptions := []grpc.DialOption{grpc.WithTransportCredentials(credential)}
conn, err := grpc.Dial(rpc, dialOptions...)
if err != nil {
return nil, err
}
client := disperser.NewDisperserClient(conn)
return client, nil
}

func (m EigenDAClient) GetBlob(ctx context.Context, BatchHeaderHash []byte, BlobIndex uint32) ([]byte, error) {
reply, err := m.client.RetrieveBlob(ctx, &disperser.RetrieveBlobRequest{
BatchHeaderHash: BatchHeaderHash,
BlobIndex: BlobIndex,
})
if err != nil {
return nil, err
}

// decode modulo bn254
decodedData := codec.RemoveEmptyByteFromPaddedBytes(reply.Data)

// Return exact data with buffer removed
reader := bytes.NewReader(decodedData)

// read version byte, we will not use it for now since there is only one version
_, err = reader.ReadByte()
if err != nil {
return nil, fmt.Errorf("EigenDA client failed to read version byte")
}

// read length uvarint
length, err := binary.ReadUvarint(reader)
if err != nil {
return nil, fmt.Errorf("EigenDA client failed to decode length uvarint prefix")
}
data := make([]byte, length)
n, err := reader.Read(data)
if err != nil {
return nil, fmt.Errorf("EigenDA client failed to copy unpadded data into final buffer")
}
if uint64(n) != length {
return nil, fmt.Errorf("EigenDA client failed, data length does not match length prefix")
}

return data, nil
}

func (m EigenDAClient) PutBlob(ctx context.Context, data []byte) (*Cert, error) {
m.Log.Info("Attempting to disperse blob to EigenDA")

// encode current blob encoding version byte
data = append([]byte{byte(NoIFFT)}, data...)

// encode data length
data = append(ConvertIntToVarUInt(len(data)), data...)

// encode modulo bn254
data = codec.ConvertByPaddingEmptyByte(data)

// do auth handshake
disperseBlobAuthClient, err := m.client.DisperseBlobAuthenticated(ctx)
if err != nil {
return nil, fmt.Errorf("error initializing DisperseBlobAuthenticated() client: %w", err)
}

err = disperseBlobAuthClient.Send(&disperser.AuthenticatedRequest{
Payload: &disperser.AuthenticatedRequest_DisperseRequest{
DisperseRequest: &disperser.DisperseBlobRequest{
Data: data,
CustomQuorumNumbers: m.Config.CustomQuorumIDs,
AccountId: m.signer.GetAccountID(),
},
},
})
if err != nil {
return nil, fmt.Errorf("failed sending initial disperse blob authenticated request: %w", err)
}

reply, err := disperseBlobAuthClient.Recv()
if err != nil {
return nil, fmt.Errorf("failed receiving challenge parameter for disperse blob authenticated request: %w", err)
}

authHeaderReply, ok := reply.Payload.(*disperser.AuthenticatedReply_BlobAuthHeader)
if !ok {
return nil, fmt.Errorf("expected blob auth header message in response to initial disperse blob authenticated request: %w", err)
}

authHeader := core.BlobAuthHeader{
BlobCommitments: encoding.BlobCommitments{},
AccountID: "",
Nonce: authHeaderReply.BlobAuthHeader.ChallengeParameter,
}

authData, err := m.signer.SignBlobRequest(authHeader)
if err != nil {
return nil, fmt.Errorf("error signing challenge parameter while performing disperse blob authenticated request: %w", err)
}

// Process challenge and send back challenge_reply
err = disperseBlobAuthClient.Send(&disperser.AuthenticatedRequest{Payload: &disperser.AuthenticatedRequest_AuthenticationData{
AuthenticationData: &disperser.AuthenticationData{
AuthenticationData: authData,
},
}})
if err != nil {
return nil, fmt.Errorf("error writing signed challenge paramter in disperse blob authenticated request: %w", err)
}

reply, err = disperseBlobAuthClient.Recv()
if err != nil {
return nil, fmt.Errorf("error receiving signed challenge paramter in disperse blob authenticated request: %w", err)
}

disperseResWrapper, ok := reply.Payload.(*disperser.AuthenticatedReply_DisperseReply)
if !ok {
return nil, fmt.Errorf("expected disperser reply message in response to signed challenge parameter submission in disperse blob authenticated request: %w", err)
}

disperseRes := disperseResWrapper.DisperseReply

// process response
if disperseRes.Result == disperser.BlobStatus_UNKNOWN ||
disperseRes.Result == disperser.BlobStatus_FAILED {
m.Log.Error("Unable to disperse blob to EigenDA, aborting", "err", err)
return nil, fmt.Errorf("reply status is %d", disperseRes.Result)
}

base64RequestID := base64.StdEncoding.EncodeToString(disperseRes.RequestId)

m.Log.Info("Blob disepersed to EigenDA, now waiting for confirmation", "requestID", base64RequestID)

timeoutTime := time.Now().Add(m.StatusQueryTimeout)
ticker := time.NewTicker(m.StatusQueryRetryInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ticker.C:
if time.Now().After(timeoutTime) {
return nil, fmt.Errorf("timed out waiting for EigenDA blob to confirm blob with request id: %s", base64RequestID)
}
statusRes, err := m.client.GetBlobStatus(ctx, &disperser.BlobStatusRequest{
RequestId: disperseRes.RequestId,
})
if err != nil {
m.Log.Warn("Unable to retrieve blob dispersal status, will retry", "requestID", base64RequestID, "err", err)
continue
}

switch statusRes.Status {
case disperser.BlobStatus_PROCESSING:
m.Log.Info("Waiting for confirmation from EigenDA", "requestID", base64RequestID)
case disperser.BlobStatus_FAILED:
m.Log.Error("EigenDA blob dispersal failed in processing", "requestID", base64RequestID, "err", err)
case disperser.BlobStatus_INSUFFICIENT_SIGNATURES:
m.Log.Error("EigenDA blob dispersal failed in processing with insufficient signatures", "requestID", base64RequestID, "err", err)
case disperser.BlobStatus_CONFIRMED:
m.Log.Info("EigenDA blob confirmed, waiting for finalization", "requestID", base64RequestID)
case disperser.BlobStatus_FINALIZED:
batchHeaderHashHex := fmt.Sprintf("0x%s", hex.EncodeToString(statusRes.Info.BlobVerificationProof.BatchMetadata.BatchHeaderHash))
m.Log.Info("Successfully dispersed blob to EigenDA", "requestID", base64RequestID, "batchHeaderHash", batchHeaderHashHex)
blobInfo := statusRes.Info
quorumIDs := make([]uint32, len(blobInfo.BlobHeader.BlobQuorumParams))
for i := range quorumIDs {
quorumIDs[i] = blobInfo.BlobHeader.BlobQuorumParams[i].QuorumNumber
}
cert := &Cert{
BatchHeaderHash: blobInfo.BlobVerificationProof.BatchMetadata.BatchHeaderHash,
BlobIndex: blobInfo.BlobVerificationProof.BlobIndex,
ReferenceBlockNumber: blobInfo.BlobVerificationProof.BatchMetadata.BatchHeader.ReferenceBlockNumber,
QuorumIDs: quorumIDs,
}
return cert, nil
default:
return nil, fmt.Errorf("EigenDA blob dispersal failed in processing with reply status %d", statusRes.Status)
}
}
}
}
40 changes: 40 additions & 0 deletions api/client/client_testnet_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package client_test

import (
"context"
"flag"
"os"
"testing"
"time"

"github.com/Layr-Labs/eigenda/api/client"
"github.com/ethereum/go-ethereum/log"
"github.com/stretchr/testify/assert"
)

var runTestnetIntegrationTests bool

func init() {
flag.BoolVar(&runTestnetIntegrationTests, "testnet-integration", false, "Run testnet-based integration tests")
}

func TestClient(t *testing.T) {
if !runTestnetIntegrationTests {
t.Skip("Skipping testnet integration test")
}
logger := log.NewLogger(log.NewTerminalHandler(os.Stderr, true))
client, err := client.NewEigenDAClient(context.Background(), logger, client.Config{
RPC: "disperser-holesky.eigenda.xyz:443",
StatusQueryTimeout: 25 * time.Minute,
StatusQueryRetryInterval: 5 * time.Second,
CustomQuorumIDs: []uint32{},
SignerPrivateKeyHex: "2d23e142a9e86a9175b9dfa213f20ea01f6c1731e09fa6edf895f70fe279cbb1",
})
data := "hello world!"
assert.NoError(t, err)
cert, err := client.PutBlob(context.Background(), []byte(data))
assert.NoError(t, err)
blob, err := client.GetBlob(context.Background(), cert.BatchHeaderHash, cert.BlobIndex)
assert.NoError(t, err)
assert.Equal(t, data, string(blob))
}
40 changes: 40 additions & 0 deletions api/client/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package client

import (
"fmt"
"time"
)

type Config struct {
// RPC is the HTTP provider URL for the Data Availability node.
RPC string

// The total amount of time that the batcher will spend waiting for EigenDA to confirm a blob
StatusQueryTimeout time.Duration

// The amount of time to wait between status queries of a newly dispersed blob
StatusQueryRetryInterval time.Duration

// The quorum IDs to write blobs to using this client. Should not include quorums 0 or 1.
CustomQuorumIDs []uint32

// Signer private key in hex encoded format. This key should not be associated with an Ethereum address holding any funds.
SignerPrivateKeyHex string
}

var DefaultQuorums = map[uint32]bool{0: true, 1: true}

func (c *Config) Check() error {
for _, e := range c.CustomQuorumIDs {
if DefaultQuorums[e] {
return fmt.Errorf("EigenDA client config failed validation because CustomQuorumIDs includes a default quorum ID %d. Because it is included by default this quorum ID can be removed from the client configuration", e)
}
}
if c.StatusQueryRetryInterval == 0 {
c.StatusQueryRetryInterval = 5 * time.Second
}
if c.StatusQueryTimeout == 0 {
c.StatusQueryTimeout = 25 * time.Minute
}
return nil
}
9 changes: 9 additions & 0 deletions api/client/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package client

import "encoding/binary"

func ConvertIntToVarUInt(v int) []byte {
buf := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(buf, uint64(v))
return buf[:n]
}
1 change: 1 addition & 0 deletions api/client/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package client_test
18 changes: 0 additions & 18 deletions api/go.mod

This file was deleted.

21 changes: 0 additions & 21 deletions api/go.sum

This file was deleted.

Loading

0 comments on commit 190aba6

Please sign in to comment.