Skip to content

Commit

Permalink
relay client
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Nov 12, 2024
1 parent e2989f9 commit 36bd63b
Show file tree
Hide file tree
Showing 9 changed files with 347 additions and 103 deletions.
39 changes: 39 additions & 0 deletions api/clients/mock/relay_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package mock

import (
"context"

"github.com/Layr-Labs/eigenda/api/clients"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/stretchr/testify/mock"
)

type MockRelayClient struct {
mock.Mock
}

var _ clients.RelayClient = (*MockRelayClient)(nil)

func NewRelayClient() *MockRelayClient {
return &MockRelayClient{}
}

func (c *MockRelayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error) {
args := c.Called(blobKey)
return args.Get(0).([]byte), args.Error(1)
}

func (c *MockRelayClient) GetChunksByRange(ctx context.Context, relayKey corev2.RelayKey, requests []*clients.ChunkRequestByRange) ([][]byte, error) {
args := c.Called()
return args.Get(0).([][]byte), args.Error(1)
}

func (c *MockRelayClient) GetChunksByIndex(ctx context.Context, relayKey corev2.RelayKey, requests []*clients.ChunkRequestByIndex) ([][]byte, error) {
args := c.Called()
return args.Get(0).([][]byte), args.Error(1)
}

func (c *MockRelayClient) Close() error {
args := c.Called()
return args.Error(0)
}
3 changes: 1 addition & 2 deletions api/clients/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
grpcnode "github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/node"
"github.com/wealdtech/go-merkletree/v2"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -64,7 +63,7 @@ func (c client) GetBlobHeader(
return nil, nil, err
}

blobHeader, err := node.GetBlobHeaderFromProto(reply.GetBlobHeader())
blobHeader, err := core.BlobHeaderFromProto(reply.GetBlobHeader())
if err != nil {
return nil, nil, err
}
Expand Down
206 changes: 206 additions & 0 deletions api/clients/relay_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package clients

import (
"context"
"fmt"
"sync"

relaygrpc "github.com/Layr-Labs/eigenda/api/grpc/relay"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/hashicorp/go-multierror"
"google.golang.org/grpc"
)

type RelayClientConfig struct {
Sockets map[corev2.RelayKey]string
UseSecureGrpcFlag bool
}

type ChunkRequestByRange struct {
BlobKey corev2.BlobKey
Start uint32
End uint32
}

type ChunkRequestByIndex struct {
BlobKey corev2.BlobKey
Indexes []uint32
}

type RelayClient interface {
// GetBlob retrieves a blob from a relay
GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error)
// GetChunksByRange retrieves blob chunks from a relay by chunk index range
// The returned slice has the same length and ordering as the input slice, and the i-th element is the bundle for the i-th request.
// Each bundle is a sequence of frames in raw form (i.e., serialized core.Bundle bytearray).
GetChunksByRange(ctx context.Context, relayKey corev2.RelayKey, requests []*ChunkRequestByRange) ([][]byte, error)
// GetChunksByIndex retrieves blob chunks from a relay by index
// The returned slice has the same length and ordering as the input slice, and the i-th element is the bundle for the i-th request.
// Each bundle is a sequence of frames in raw form (i.e., serialized core.Bundle bytearray).
GetChunksByIndex(ctx context.Context, relayKey corev2.RelayKey, requests []*ChunkRequestByIndex) ([][]byte, error)
Close() error
}

type relayClient struct {
config *RelayClientConfig

initOnce map[corev2.RelayKey]*sync.Once
conns map[corev2.RelayKey]*grpc.ClientConn
logger logging.Logger

grpcClients map[corev2.RelayKey]relaygrpc.RelayClient
}

var _ RelayClient = (*relayClient)(nil)

// NewRelayClient creates a new RelayClient that connects to the relays specified in the config.
// It keeps a connection to each relay and reuses it for subsequent requests, and the connection is lazily instantiated.
func NewRelayClient(config *RelayClientConfig, logger logging.Logger) (*relayClient, error) {
if config == nil || len(config.Sockets) > 0 {
return nil, fmt.Errorf("invalid config: %v", config)
}

initOnce := make(map[corev2.RelayKey]*sync.Once)
conns := make(map[corev2.RelayKey]*grpc.ClientConn)
grpcClients := make(map[corev2.RelayKey]relaygrpc.RelayClient)
for key := range config.Sockets {
initOnce[key] = &sync.Once{}
}
return &relayClient{
config: config,

initOnce: initOnce,
conns: conns,
logger: logger,

grpcClients: grpcClients,
}, nil
}

func (c *relayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error) {
if err := c.initOnceGrpcConnection(relayKey); err != nil {
return nil, err
}

client, ok := c.grpcClients[relayKey]
if !ok {
return nil, fmt.Errorf("no grpc client for relay key: %v", relayKey)
}

res, err := client.GetBlob(ctx, &relaygrpc.GetBlobRequest{
BlobKey: blobKey[:],
})
if err != nil {
return nil, err
}

return res.GetBlob(), nil
}

func (c *relayClient) GetChunksByRange(ctx context.Context, relayKey corev2.RelayKey, requests []*ChunkRequestByRange) ([][]byte, error) {
if len(requests) == 0 {
return nil, fmt.Errorf("no requests")
}
if err := c.initOnceGrpcConnection(relayKey); err != nil {
return nil, err
}

client, ok := c.grpcClients[relayKey]
if !ok {
return nil, fmt.Errorf("no grpc client for relay key: %v", relayKey)
}

grpcRequests := make([]*relaygrpc.ChunkRequest, len(requests))
for i, req := range requests {
grpcRequests[i] = &relaygrpc.ChunkRequest{
Request: &relaygrpc.ChunkRequest_ByRange{
ByRange: &relaygrpc.ChunkRequestByRange{
BlobKey: req.BlobKey[:],
StartIndex: req.Start,
EndIndex: req.End,
},
},
}
}
res, err := client.GetChunks(ctx, &relaygrpc.GetChunksRequest{
ChunkRequests: grpcRequests,
})

if err != nil {
return nil, err
}

return res.GetData(), nil
}

func (c *relayClient) GetChunksByIndex(ctx context.Context, relayKey corev2.RelayKey, requests []*ChunkRequestByIndex) ([][]byte, error) {
if len(requests) == 0 {
return nil, fmt.Errorf("no requests")
}
if err := c.initOnceGrpcConnection(relayKey); err != nil {
return nil, err
}

client, ok := c.grpcClients[relayKey]
if !ok {
return nil, fmt.Errorf("no grpc client for relay key: %v", relayKey)
}

grpcRequests := make([]*relaygrpc.ChunkRequest, len(requests))
for i, req := range requests {
grpcRequests[i] = &relaygrpc.ChunkRequest{
Request: &relaygrpc.ChunkRequest_ByIndex{
ByIndex: &relaygrpc.ChunkRequestByIndex{
BlobKey: req.BlobKey[:],
ChunkIndices: req.Indexes,
},
},
}
}
res, err := client.GetChunks(ctx, &relaygrpc.GetChunksRequest{
ChunkRequests: grpcRequests,
})

if err != nil {
return nil, err
}

return res.GetData(), nil
}

func (c *relayClient) initOnceGrpcConnection(key corev2.RelayKey) error {
var initErr error
c.initOnce[key].Do(func() {
socket, ok := c.config.Sockets[key]
if !ok {
initErr = fmt.Errorf("unknown relay key: %v", key)
return
}
dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag)
conn, err := grpc.Dial(socket, dialOptions...)
if err != nil {
initErr = err
return
}
c.conns[key] = conn
c.grpcClients[key] = relaygrpc.NewRelayClient(conn)
})
return initErr
}

func (c *relayClient) Close() error {
var errList *multierror.Error
for k, conn := range c.conns {
if conn != nil {
err := conn.Close()
conn = nil
c.grpcClients[k] = nil
if err != nil {
c.logger.Error("failed to close connection", "err", err)
errList = multierror.Append(errList, err)
}
}
}
return errList.ErrorOrNil()
}
92 changes: 92 additions & 0 deletions core/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ import (
"regexp"
"slices"

"github.com/Layr-Labs/eigenda/api"
binding "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDAServiceManager"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/consensys/gnark-crypto/ecc/bn254"
"github.com/consensys/gnark-crypto/ecc/bn254/fp"

pb "github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/wealdtech/go-merkletree/v2"
"github.com/wealdtech/go-merkletree/v2/keccak256"
Expand Down Expand Up @@ -393,6 +397,94 @@ func (h *BlobHeader) Deserialize(data []byte) (*BlobHeader, error) {
return h, err
}

// GetBatchHeader constructs a core.BatchHeader from a proto of pb.StoreChunksRequest.
// Note the StoreChunksRequest is validated as soon as it enters the node gRPC
// interface, see grpc.Server.validateStoreChunkRequest.
func BatchHeaderFromProtobuf(in *pb.BatchHeader) (*BatchHeader, error) {
if in == nil || len(in.GetBatchRoot()) == 0 {
return nil, fmt.Errorf("batch header is nil or empty")
}
var batchRoot [32]byte
copy(batchRoot[:], in.GetBatchRoot())
return &BatchHeader{
ReferenceBlockNumber: uint(in.GetReferenceBlockNumber()),
BatchRoot: batchRoot,
}, nil
}

// BlobHeaderFromProto constructs a core.BlobHeader from a proto of pb.BlobHeader.
func BlobHeaderFromProto(h *pb.BlobHeader) (*BlobHeader, error) {
if h == nil {
return nil, fmt.Errorf("GetBlobHeaderFromProto: blob header is nil")

}

commitX := new(fp.Element).SetBytes(h.GetCommitment().GetX())
commitY := new(fp.Element).SetBytes(h.GetCommitment().GetY())
commitment := &encoding.G1Commitment{
X: *commitX,
Y: *commitY,
}

if !(*bn254.G1Affine)(commitment).IsInSubGroup() {
return nil, errors.New("commitment is not in the subgroup")
}

var lengthCommitment, lengthProof encoding.G2Commitment
if h.GetLengthCommitment() != nil {
lengthCommitment.X.A0 = *new(fp.Element).SetBytes(h.GetLengthCommitment().GetXA0())
lengthCommitment.X.A1 = *new(fp.Element).SetBytes(h.GetLengthCommitment().GetXA1())
lengthCommitment.Y.A0 = *new(fp.Element).SetBytes(h.GetLengthCommitment().GetYA0())
lengthCommitment.Y.A1 = *new(fp.Element).SetBytes(h.GetLengthCommitment().GetYA1())
}

if !(*bn254.G2Affine)(&lengthCommitment).IsInSubGroup() {
return nil, errors.New("lengthCommitment is not in the subgroup")
}

if h.GetLengthProof() != nil {
lengthProof.X.A0 = *new(fp.Element).SetBytes(h.GetLengthProof().GetXA0())
lengthProof.X.A1 = *new(fp.Element).SetBytes(h.GetLengthProof().GetXA1())
lengthProof.Y.A0 = *new(fp.Element).SetBytes(h.GetLengthProof().GetYA0())
lengthProof.Y.A1 = *new(fp.Element).SetBytes(h.GetLengthProof().GetYA1())
}

if !(*bn254.G2Affine)(&lengthProof).IsInSubGroup() {
return nil, errors.New("lengthProof is not in the subgroup")
}

quorumHeaders := make([]*BlobQuorumInfo, len(h.GetQuorumHeaders()))
for i, header := range h.GetQuorumHeaders() {
if header.GetQuorumId() > MaxQuorumID {
return nil, api.NewErrorInvalidArg(fmt.Sprintf("quorum ID must be in range [0, %d], but found %d", MaxQuorumID, header.GetQuorumId()))
}
if err := ValidateSecurityParam(header.GetConfirmationThreshold(), header.GetAdversaryThreshold()); err != nil {
return nil, err
}

quorumHeaders[i] = &BlobQuorumInfo{
SecurityParam: SecurityParam{
QuorumID: QuorumID(header.GetQuorumId()),
AdversaryThreshold: uint8(header.GetAdversaryThreshold()),
ConfirmationThreshold: uint8(header.GetConfirmationThreshold()),
QuorumRate: header.GetRatelimit(),
},
ChunkLength: uint(header.GetChunkLength()),
}
}

return &BlobHeader{
BlobCommitments: encoding.BlobCommitments{
Commitment: commitment,
LengthCommitment: &lengthCommitment,
LengthProof: &lengthProof,
Length: uint(h.GetLength()),
},
QuorumInfos: quorumHeaders,
AccountID: h.AccountId,
}, nil
}

func encode(obj any) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
Expand Down
Loading

0 comments on commit 36bd63b

Please sign in to comment.