Skip to content

Commit

Permalink
relay client
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Nov 8, 2024
1 parent 6e14746 commit 113b4c6
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 106 deletions.
39 changes: 39 additions & 0 deletions api/clients/mock/relay_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package mock

import (
"context"

"github.com/Layr-Labs/eigenda/api/clients"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/stretchr/testify/mock"
)

type MockRelayClient struct {
mock.Mock
}

var _ clients.RelayClient = (*MockRelayClient)(nil)

func NewRelayClient() *MockRelayClient {
return &MockRelayClient{}
}

func (c *MockRelayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error) {
args := c.Called(blobKey)
return args.Get(0).([]byte), args.Error(1)
}

func (c *MockRelayClient) GetChunksByRange(ctx context.Context, requests []*clients.ChunkRequestByRange) ([]clients.BundleResult, error) {
args := c.Called()
return args.Get(0).([]clients.BundleResult), args.Error(1)
}

func (c *MockRelayClient) GetChunksByIndex(ctx context.Context, requests []*clients.ChunkRequestByIndex) ([]clients.BundleResult, error) {
args := c.Called()
return args.Get(0).([]clients.BundleResult), args.Error(1)
}

func (c *MockRelayClient) Close() error {
args := c.Called()
return args.Error(0)
}
3 changes: 1 addition & 2 deletions api/clients/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
grpcnode "github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/node"
"github.com/wealdtech/go-merkletree/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -64,7 +63,7 @@ func (c client) GetBlobHeader(
return nil, nil, err
}

blobHeader, err := node.GetBlobHeaderFromProto(reply.GetBlobHeader())
blobHeader, err := core.BlobHeaderFromProto(reply.GetBlobHeader())
if err != nil {
return nil, nil, err
}
Expand Down
149 changes: 149 additions & 0 deletions api/clients/relay_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package clients

import (
"context"
"fmt"
"sync"

relaygrpc "github.com/Layr-Labs/eigenda/api/grpc/relay"
"github.com/Layr-Labs/eigenda/core"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/hashicorp/go-multierror"
"google.golang.org/grpc"
)

type RelayClientConfig struct {
Sockets map[corev2.RelayKey]string
UseSecureGrpcFlag bool
}

type ChunkRequestByRange struct {
BlobKey corev2.BlobKey
RelayKey corev2.RelayKey
Start uint32
End uint32
}

type ChunkRequestByIndex struct {
BlobKey corev2.BlobKey
RelayKey corev2.RelayKey
Indexes []uint32
}

type BundleResult struct {
Bundle core.Bundle
Err error
}

type RelayClient interface {
// GetBlob retrieves a blob from a relay
GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error)
// GetChunksByRange retrieves blob chunks from a relay by chunk index range
// The returned slice has the same length and ordering as the input slice, and the i-th element is the bundle for the i-th request.
GetChunksByRange(ctx context.Context, requests []*ChunkRequestByRange) ([]BundleResult, error)
// GetChunksByIndex retrieves blob chunks from a relay by index
// The returned slice has the same length and ordering as the input slice, and the i-th element is the bundle for the i-th request.
GetChunksByIndex(ctx context.Context, requests []*ChunkRequestByIndex) ([]BundleResult, error)
Close() error
}

type relayClient struct {
config *RelayClientConfig

initOnce map[corev2.RelayKey]*sync.Once
conns map[corev2.RelayKey]*grpc.ClientConn
logger logging.Logger

grpcClients map[corev2.RelayKey]relaygrpc.RelayClient
}

var _ RelayClient = (*relayClient)(nil)

// NewRelayClient creates a new RelayClient that connects to the relays specified in the config.
// It keeps a connection to each relay and reuses it for subsequent requests, and the connection is lazily instantiated.
func NewRelayClient(config *RelayClientConfig, logger logging.Logger) (*relayClient, error) {
if config == nil || len(config.Sockets) > 0 {
return nil, fmt.Errorf("invalid config: %v", config)
}

initOnce := make(map[corev2.RelayKey]*sync.Once)
conns := make(map[corev2.RelayKey]*grpc.ClientConn)
grpcClients := make(map[corev2.RelayKey]relaygrpc.RelayClient)
for key := range config.Sockets {
initOnce[key] = &sync.Once{}
}
return &relayClient{
config: config,

initOnce: initOnce,
conns: conns,
logger: logger,

grpcClients: grpcClients,
}, nil
}

func (c *relayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error) {
if err := c.initOnceGrpcConnection(relayKey); err != nil {
return nil, err
}

return nil, fmt.Errorf("not implemented")
}

func (c *relayClient) GetChunksByRange(ctx context.Context, requests []*ChunkRequestByRange) ([]BundleResult, error) {
for _, req := range requests {
if err := c.initOnceGrpcConnection(req.RelayKey); err != nil {
return nil, err
}
}

return nil, fmt.Errorf("not implemented")
}

func (c *relayClient) GetChunksByIndex(ctx context.Context, requests []*ChunkRequestByIndex) ([]BundleResult, error) {
for _, req := range requests {
if err := c.initOnceGrpcConnection(req.RelayKey); err != nil {
return nil, err
}
}

return nil, fmt.Errorf("not implemented")
}

func (c *relayClient) initOnceGrpcConnection(key corev2.RelayKey) error {
var initErr error
c.initOnce[key].Do(func() {
socket, ok := c.config.Sockets[key]
if !ok {
initErr = fmt.Errorf("unknown relay key: %v", key)
return
}
dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag)
conn, err := grpc.Dial(socket, dialOptions...)
if err != nil {
initErr = err
return
}
c.conns[key] = conn
c.grpcClients[key] = relaygrpc.NewRelayClient(conn)
})
return initErr
}

func (c *relayClient) Close() error {
var errList *multierror.Error
for k, conn := range c.conns {
if conn != nil {
err := conn.Close()
conn = nil
c.grpcClients[k] = nil
if err != nil {
c.logger.Error("failed to close connection", "err", err)
errList = multierror.Append(errList, err)
}
}
}
return errList.ErrorOrNil()
}
92 changes: 92 additions & 0 deletions core/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ import (
"regexp"
"slices"

"github.com/Layr-Labs/eigenda/api"
binding "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDAServiceManager"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/consensys/gnark-crypto/ecc/bn254"
"github.com/consensys/gnark-crypto/ecc/bn254/fp"

pb "github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/wealdtech/go-merkletree/v2"
"github.com/wealdtech/go-merkletree/v2/keccak256"
Expand Down Expand Up @@ -393,6 +397,94 @@ func (h *BlobHeader) Deserialize(data []byte) (*BlobHeader, error) {
return h, err
}

// GetBatchHeader constructs a core.BatchHeader from a proto of pb.StoreChunksRequest.
// Note the StoreChunksRequest is validated as soon as it enters the node gRPC
// interface, see grpc.Server.validateStoreChunkRequest.
func BatchHeaderFromProtobuf(in *pb.BatchHeader) (*BatchHeader, error) {
if in == nil || len(in.GetBatchRoot()) == 0 {
return nil, fmt.Errorf("batch header is nil or empty")
}
var batchRoot [32]byte
copy(batchRoot[:], in.GetBatchRoot())
return &BatchHeader{
ReferenceBlockNumber: uint(in.GetReferenceBlockNumber()),
BatchRoot: batchRoot,
}, nil
}

// BlobHeaderFromProto constructs a core.BlobHeader from a proto of pb.BlobHeader.
func BlobHeaderFromProto(h *pb.BlobHeader) (*BlobHeader, error) {
if h == nil {
return nil, fmt.Errorf("GetBlobHeaderFromProto: blob header is nil")

}

commitX := new(fp.Element).SetBytes(h.GetCommitment().GetX())
commitY := new(fp.Element).SetBytes(h.GetCommitment().GetY())
commitment := &encoding.G1Commitment{
X: *commitX,
Y: *commitY,
}

if !(*bn254.G1Affine)(commitment).IsInSubGroup() {
return nil, errors.New("commitment is not in the subgroup")
}

var lengthCommitment, lengthProof encoding.G2Commitment
if h.GetLengthCommitment() != nil {
lengthCommitment.X.A0 = *new(fp.Element).SetBytes(h.GetLengthCommitment().GetXA0())
lengthCommitment.X.A1 = *new(fp.Element).SetBytes(h.GetLengthCommitment().GetXA1())
lengthCommitment.Y.A0 = *new(fp.Element).SetBytes(h.GetLengthCommitment().GetYA0())
lengthCommitment.Y.A1 = *new(fp.Element).SetBytes(h.GetLengthCommitment().GetYA1())
}

if !(*bn254.G2Affine)(&lengthCommitment).IsInSubGroup() {
return nil, errors.New("lengthCommitment is not in the subgroup")
}

if h.GetLengthProof() != nil {
lengthProof.X.A0 = *new(fp.Element).SetBytes(h.GetLengthProof().GetXA0())
lengthProof.X.A1 = *new(fp.Element).SetBytes(h.GetLengthProof().GetXA1())
lengthProof.Y.A0 = *new(fp.Element).SetBytes(h.GetLengthProof().GetYA0())
lengthProof.Y.A1 = *new(fp.Element).SetBytes(h.GetLengthProof().GetYA1())
}

if !(*bn254.G2Affine)(&lengthProof).IsInSubGroup() {
return nil, errors.New("lengthProof is not in the subgroup")
}

quorumHeaders := make([]*BlobQuorumInfo, len(h.GetQuorumHeaders()))
for i, header := range h.GetQuorumHeaders() {
if header.GetQuorumId() > MaxQuorumID {
return nil, api.NewErrorInvalidArg(fmt.Sprintf("quorum ID must be in range [0, %d], but found %d", MaxQuorumID, header.GetQuorumId()))
}
if err := ValidateSecurityParam(header.GetConfirmationThreshold(), header.GetAdversaryThreshold()); err != nil {
return nil, err
}

quorumHeaders[i] = &BlobQuorumInfo{
SecurityParam: SecurityParam{
QuorumID: QuorumID(header.GetQuorumId()),
AdversaryThreshold: uint8(header.GetAdversaryThreshold()),
ConfirmationThreshold: uint8(header.GetConfirmationThreshold()),
QuorumRate: header.GetRatelimit(),
},
ChunkLength: uint(header.GetChunkLength()),
}
}

return &BlobHeader{
BlobCommitments: encoding.BlobCommitments{
Commitment: commitment,
LengthCommitment: &lengthCommitment,
LengthProof: &lengthProof,
Length: uint(h.GetLength()),
},
QuorumInfos: quorumHeaders,
AccountID: h.AccountId,
}, nil
}

func encode(obj any) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
Expand Down
10 changes: 5 additions & 5 deletions node/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func (s *Server) handleStoreChunksRequest(ctx context.Context, in *pb.StoreChunk
start := time.Now()

// Get batch header hash
batchHeader, err := node.GetBatchHeader(in.GetBatchHeader())
batchHeader, err := core.BatchHeaderFromProtobuf(in.GetBatchHeader())
if err != nil {
return nil, err
return nil, api.NewErrorInvalidArg(err.Error())
}

blobs, err := node.GetBlobMessages(in.GetBlobs(), s.node.Config.NumBatchDeserializationWorkers)
Expand Down Expand Up @@ -263,7 +263,7 @@ func (s *Server) AttestBatch(ctx context.Context, in *pb.AttestBatchRequest) (*p
copy(h[:], hash)
blobHeaderHashes[i] = h
}
batchHeader, err := node.GetBatchHeader(in.GetBatchHeader())
batchHeader, err := core.BatchHeaderFromProtobuf(in.GetBatchHeader())
if err != nil {
return nil, fmt.Errorf("failed to get the batch header: %w", err)
}
Expand Down Expand Up @@ -428,7 +428,7 @@ func (s *Server) rebuildMerkleTree(batchHeaderHash [32]byte) (*merkletree.Merkle
return nil, err
}

blobHeader, err := node.GetBlobHeaderFromProto(&protoBlobHeader)
blobHeader, err := core.BlobHeaderFromProto(&protoBlobHeader)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -470,7 +470,7 @@ func (s *Server) getBlobHeader(ctx context.Context, batchHeaderHash [32]byte, bl
return nil, nil, err
}

blobHeader, err := node.GetBlobHeaderFromProto(&protoBlobHeader)
blobHeader, err := core.BlobHeaderFromProto(&protoBlobHeader)
if err != nil {
return nil, nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions node/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func TestMinibatchDispersalAndRetrieval(t *testing.T) {
})
assert.NoError(t, err)
assert.NotNil(t, blobHeaderReply)
blobHeader, err := node.GetBlobHeaderFromProto(blobHeaderReply.GetBlobHeader())
blobHeader, err := core.BlobHeaderFromProto(blobHeaderReply.GetBlobHeader())
assert.NoError(t, err)
assert.Equal(t, blobHeader, blobHeaders[0])
proof := &merkletree.Proof{
Expand All @@ -484,7 +484,7 @@ func TestMinibatchDispersalAndRetrieval(t *testing.T) {
})
assert.NoError(t, err)
assert.NotNil(t, blobHeaderReply)
blobHeader, err = node.GetBlobHeaderFromProto(blobHeaderReply.GetBlobHeader())
blobHeader, err = core.BlobHeaderFromProto(blobHeaderReply.GetBlobHeader())
assert.NoError(t, err)
assert.Equal(t, blobHeader, blobHeaders[1])
proof = &merkletree.Proof{
Expand Down
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ func (n *Node) ValidateBatchContents(ctx context.Context, blobHeaderHashes [][32
return errors.New("blob headers have different reference block numbers")
}

blobHeader, err := GetBlobHeaderFromProto(&protoBlobHeader)
blobHeader, err := core.BlobHeaderFromProto(&protoBlobHeader)
if err != nil {
return fmt.Errorf("failed to get blob header from proto: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions node/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,15 +475,15 @@ func TestStoreBatchBlobMapping(t *testing.T) {
assert.Nil(t, err)
err = proto.Unmarshal(blobHeaderBytes0, &protoBlobHeader)
assert.Nil(t, err)
blobHeader0, err := node.GetBlobHeaderFromProto(&protoBlobHeader)
blobHeader0, err := core.BlobHeaderFromProto(&protoBlobHeader)
assert.Nil(t, err)

assert.Equal(t, blobHeader0, blobs[0].BlobHeader)
blobHeaderBytes1, err := s.GetBlobHeader(ctx, batchHeaderHash, 1)
assert.Nil(t, err)
err = proto.Unmarshal(blobHeaderBytes1, &protoBlobHeader)
assert.Nil(t, err)
blobHeader1, err := node.GetBlobHeaderFromProto(&protoBlobHeader)
blobHeader1, err := core.BlobHeaderFromProto(&protoBlobHeader)
assert.Nil(t, err)
assert.Equal(t, blobHeader1, blobs[1].BlobHeader)
blobHeaderBytes2, err := s.GetBlobHeader(ctx, batchHeaderHash, 2)
Expand Down
Loading

0 comments on commit 113b4c6

Please sign in to comment.