diff --git a/api/clients/mock/relay_client.go b/api/clients/mock/relay_client.go new file mode 100644 index 0000000000..68f09b57a3 --- /dev/null +++ b/api/clients/mock/relay_client.go @@ -0,0 +1,39 @@ +package mock + +import ( + "context" + + "github.com/Layr-Labs/eigenda/api/clients" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/stretchr/testify/mock" +) + +type MockRelayClient struct { + mock.Mock +} + +var _ clients.RelayClient = (*MockRelayClient)(nil) + +func NewRelayClient() *MockRelayClient { + return &MockRelayClient{} +} + +func (c *MockRelayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error) { + args := c.Called(blobKey) + return args.Get(0).([]byte), args.Error(1) +} + +func (c *MockRelayClient) GetChunksByRange(ctx context.Context, relayKey corev2.RelayKey, requests []*clients.ChunkRequestByRange) ([][]byte, error) { + args := c.Called() + return args.Get(0).([][]byte), args.Error(1) +} + +func (c *MockRelayClient) GetChunksByIndex(ctx context.Context, relayKey corev2.RelayKey, requests []*clients.ChunkRequestByIndex) ([][]byte, error) { + args := c.Called() + return args.Get(0).([][]byte), args.Error(1) +} + +func (c *MockRelayClient) Close() error { + args := c.Called() + return args.Error(0) +} diff --git a/api/clients/node_client.go b/api/clients/node_client.go index 9d620da2ae..bbf0a89a77 100644 --- a/api/clients/node_client.go +++ b/api/clients/node_client.go @@ -8,7 +8,6 @@ import ( grpcnode "github.com/Layr-Labs/eigenda/api/grpc/node" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/encoding" - "github.com/Layr-Labs/eigenda/node" "github.com/wealdtech/go-merkletree/v2" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -64,7 +63,7 @@ func (c client) GetBlobHeader( return nil, nil, err } - blobHeader, err := node.GetBlobHeaderFromProto(reply.GetBlobHeader()) + blobHeader, err := core.BlobHeaderFromProto(reply.GetBlobHeader()) if err != nil { return nil, nil, err } diff --git a/api/clients/relay_client.go b/api/clients/relay_client.go new file mode 100644 index 0000000000..10dccfb7b7 --- /dev/null +++ b/api/clients/relay_client.go @@ -0,0 +1,206 @@ +package clients + +import ( + "context" + "fmt" + "sync" + + relaygrpc "github.com/Layr-Labs/eigenda/api/grpc/relay" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/hashicorp/go-multierror" + "google.golang.org/grpc" +) + +type RelayClientConfig struct { + Sockets map[corev2.RelayKey]string + UseSecureGrpcFlag bool +} + +type ChunkRequestByRange struct { + BlobKey corev2.BlobKey + Start uint32 + End uint32 +} + +type ChunkRequestByIndex struct { + BlobKey corev2.BlobKey + Indexes []uint32 +} + +type RelayClient interface { + // GetBlob retrieves a blob from a relay + GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error) + // GetChunksByRange retrieves blob chunks from a relay by chunk index range + // The returned slice has the same length and ordering as the input slice, and the i-th element is the bundle for the i-th request. + // Each bundle is a sequence of frames in raw form (i.e., serialized core.Bundle bytearray). + GetChunksByRange(ctx context.Context, relayKey corev2.RelayKey, requests []*ChunkRequestByRange) ([][]byte, error) + // GetChunksByIndex retrieves blob chunks from a relay by index + // The returned slice has the same length and ordering as the input slice, and the i-th element is the bundle for the i-th request. + // Each bundle is a sequence of frames in raw form (i.e., serialized core.Bundle bytearray). + GetChunksByIndex(ctx context.Context, relayKey corev2.RelayKey, requests []*ChunkRequestByIndex) ([][]byte, error) + Close() error +} + +type relayClient struct { + config *RelayClientConfig + + initOnce map[corev2.RelayKey]*sync.Once + conns map[corev2.RelayKey]*grpc.ClientConn + logger logging.Logger + + grpcClients map[corev2.RelayKey]relaygrpc.RelayClient +} + +var _ RelayClient = (*relayClient)(nil) + +// NewRelayClient creates a new RelayClient that connects to the relays specified in the config. +// It keeps a connection to each relay and reuses it for subsequent requests, and the connection is lazily instantiated. +func NewRelayClient(config *RelayClientConfig, logger logging.Logger) (*relayClient, error) { + if config == nil || len(config.Sockets) > 0 { + return nil, fmt.Errorf("invalid config: %v", config) + } + + initOnce := make(map[corev2.RelayKey]*sync.Once) + conns := make(map[corev2.RelayKey]*grpc.ClientConn) + grpcClients := make(map[corev2.RelayKey]relaygrpc.RelayClient) + for key := range config.Sockets { + initOnce[key] = &sync.Once{} + } + return &relayClient{ + config: config, + + initOnce: initOnce, + conns: conns, + logger: logger, + + grpcClients: grpcClients, + }, nil +} + +func (c *relayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error) { + if err := c.initOnceGrpcConnection(relayKey); err != nil { + return nil, err + } + + client, ok := c.grpcClients[relayKey] + if !ok { + return nil, fmt.Errorf("no grpc client for relay key: %v", relayKey) + } + + res, err := client.GetBlob(ctx, &relaygrpc.GetBlobRequest{ + BlobKey: blobKey[:], + }) + if err != nil { + return nil, err + } + + return res.GetBlob(), nil +} + +func (c *relayClient) GetChunksByRange(ctx context.Context, relayKey corev2.RelayKey, requests []*ChunkRequestByRange) ([][]byte, error) { + if len(requests) == 0 { + return nil, fmt.Errorf("no requests") + } + if err := c.initOnceGrpcConnection(relayKey); err != nil { + return nil, err + } + + client, ok := c.grpcClients[relayKey] + if !ok { + return nil, fmt.Errorf("no grpc client for relay key: %v", relayKey) + } + + grpcRequests := make([]*relaygrpc.ChunkRequest, len(requests)) + for i, req := range requests { + grpcRequests[i] = &relaygrpc.ChunkRequest{ + Request: &relaygrpc.ChunkRequest_ByRange{ + ByRange: &relaygrpc.ChunkRequestByRange{ + BlobKey: req.BlobKey[:], + StartIndex: req.Start, + EndIndex: req.End, + }, + }, + } + } + res, err := client.GetChunks(ctx, &relaygrpc.GetChunksRequest{ + ChunkRequests: grpcRequests, + }) + + if err != nil { + return nil, err + } + + return res.GetData(), nil +} + +func (c *relayClient) GetChunksByIndex(ctx context.Context, relayKey corev2.RelayKey, requests []*ChunkRequestByIndex) ([][]byte, error) { + if len(requests) == 0 { + return nil, fmt.Errorf("no requests") + } + if err := c.initOnceGrpcConnection(relayKey); err != nil { + return nil, err + } + + client, ok := c.grpcClients[relayKey] + if !ok { + return nil, fmt.Errorf("no grpc client for relay key: %v", relayKey) + } + + grpcRequests := make([]*relaygrpc.ChunkRequest, len(requests)) + for i, req := range requests { + grpcRequests[i] = &relaygrpc.ChunkRequest{ + Request: &relaygrpc.ChunkRequest_ByIndex{ + ByIndex: &relaygrpc.ChunkRequestByIndex{ + BlobKey: req.BlobKey[:], + ChunkIndices: req.Indexes, + }, + }, + } + } + res, err := client.GetChunks(ctx, &relaygrpc.GetChunksRequest{ + ChunkRequests: grpcRequests, + }) + + if err != nil { + return nil, err + } + + return res.GetData(), nil +} + +func (c *relayClient) initOnceGrpcConnection(key corev2.RelayKey) error { + var initErr error + c.initOnce[key].Do(func() { + socket, ok := c.config.Sockets[key] + if !ok { + initErr = fmt.Errorf("unknown relay key: %v", key) + return + } + dialOptions := getGrpcDialOptions(c.config.UseSecureGrpcFlag) + conn, err := grpc.Dial(socket, dialOptions...) + if err != nil { + initErr = err + return + } + c.conns[key] = conn + c.grpcClients[key] = relaygrpc.NewRelayClient(conn) + }) + return initErr +} + +func (c *relayClient) Close() error { + var errList *multierror.Error + for k, conn := range c.conns { + if conn != nil { + err := conn.Close() + conn = nil + c.grpcClients[k] = nil + if err != nil { + c.logger.Error("failed to close connection", "err", err) + errList = multierror.Append(errList, err) + } + } + } + return errList.ErrorOrNil() +} diff --git a/core/serialization.go b/core/serialization.go index a0159efabd..534a52cccf 100644 --- a/core/serialization.go +++ b/core/serialization.go @@ -10,9 +10,13 @@ import ( "regexp" "slices" + "github.com/Layr-Labs/eigenda/api" binding "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDAServiceManager" + "github.com/Layr-Labs/eigenda/encoding" "github.com/consensys/gnark-crypto/ecc/bn254" + "github.com/consensys/gnark-crypto/ecc/bn254/fp" + pb "github.com/Layr-Labs/eigenda/api/grpc/node" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/wealdtech/go-merkletree/v2" "github.com/wealdtech/go-merkletree/v2/keccak256" @@ -393,6 +397,94 @@ func (h *BlobHeader) Deserialize(data []byte) (*BlobHeader, error) { return h, err } +// GetBatchHeader constructs a core.BatchHeader from a proto of pb.StoreChunksRequest. +// Note the StoreChunksRequest is validated as soon as it enters the node gRPC +// interface, see grpc.Server.validateStoreChunkRequest. +func BatchHeaderFromProtobuf(in *pb.BatchHeader) (*BatchHeader, error) { + if in == nil || len(in.GetBatchRoot()) == 0 { + return nil, fmt.Errorf("batch header is nil or empty") + } + var batchRoot [32]byte + copy(batchRoot[:], in.GetBatchRoot()) + return &BatchHeader{ + ReferenceBlockNumber: uint(in.GetReferenceBlockNumber()), + BatchRoot: batchRoot, + }, nil +} + +// BlobHeaderFromProto constructs a core.BlobHeader from a proto of pb.BlobHeader. +func BlobHeaderFromProto(h *pb.BlobHeader) (*BlobHeader, error) { + if h == nil { + return nil, fmt.Errorf("GetBlobHeaderFromProto: blob header is nil") + + } + + commitX := new(fp.Element).SetBytes(h.GetCommitment().GetX()) + commitY := new(fp.Element).SetBytes(h.GetCommitment().GetY()) + commitment := &encoding.G1Commitment{ + X: *commitX, + Y: *commitY, + } + + if !(*bn254.G1Affine)(commitment).IsInSubGroup() { + return nil, errors.New("commitment is not in the subgroup") + } + + var lengthCommitment, lengthProof encoding.G2Commitment + if h.GetLengthCommitment() != nil { + lengthCommitment.X.A0 = *new(fp.Element).SetBytes(h.GetLengthCommitment().GetXA0()) + lengthCommitment.X.A1 = *new(fp.Element).SetBytes(h.GetLengthCommitment().GetXA1()) + lengthCommitment.Y.A0 = *new(fp.Element).SetBytes(h.GetLengthCommitment().GetYA0()) + lengthCommitment.Y.A1 = *new(fp.Element).SetBytes(h.GetLengthCommitment().GetYA1()) + } + + if !(*bn254.G2Affine)(&lengthCommitment).IsInSubGroup() { + return nil, errors.New("lengthCommitment is not in the subgroup") + } + + if h.GetLengthProof() != nil { + lengthProof.X.A0 = *new(fp.Element).SetBytes(h.GetLengthProof().GetXA0()) + lengthProof.X.A1 = *new(fp.Element).SetBytes(h.GetLengthProof().GetXA1()) + lengthProof.Y.A0 = *new(fp.Element).SetBytes(h.GetLengthProof().GetYA0()) + lengthProof.Y.A1 = *new(fp.Element).SetBytes(h.GetLengthProof().GetYA1()) + } + + if !(*bn254.G2Affine)(&lengthProof).IsInSubGroup() { + return nil, errors.New("lengthProof is not in the subgroup") + } + + quorumHeaders := make([]*BlobQuorumInfo, len(h.GetQuorumHeaders())) + for i, header := range h.GetQuorumHeaders() { + if header.GetQuorumId() > MaxQuorumID { + return nil, api.NewErrorInvalidArg(fmt.Sprintf("quorum ID must be in range [0, %d], but found %d", MaxQuorumID, header.GetQuorumId())) + } + if err := ValidateSecurityParam(header.GetConfirmationThreshold(), header.GetAdversaryThreshold()); err != nil { + return nil, err + } + + quorumHeaders[i] = &BlobQuorumInfo{ + SecurityParam: SecurityParam{ + QuorumID: QuorumID(header.GetQuorumId()), + AdversaryThreshold: uint8(header.GetAdversaryThreshold()), + ConfirmationThreshold: uint8(header.GetConfirmationThreshold()), + QuorumRate: header.GetRatelimit(), + }, + ChunkLength: uint(header.GetChunkLength()), + } + } + + return &BlobHeader{ + BlobCommitments: encoding.BlobCommitments{ + Commitment: commitment, + LengthCommitment: &lengthCommitment, + LengthProof: &lengthProof, + Length: uint(h.GetLength()), + }, + QuorumInfos: quorumHeaders, + AccountID: h.AccountId, + }, nil +} + func encode(obj any) ([]byte, error) { var buf bytes.Buffer enc := gob.NewEncoder(&buf) diff --git a/node/grpc/server.go b/node/grpc/server.go index f6fa598411..db6b3bfbf2 100644 --- a/node/grpc/server.go +++ b/node/grpc/server.go @@ -72,9 +72,9 @@ func (s *Server) handleStoreChunksRequest(ctx context.Context, in *pb.StoreChunk start := time.Now() // Get batch header hash - batchHeader, err := node.GetBatchHeader(in.GetBatchHeader()) + batchHeader, err := core.BatchHeaderFromProtobuf(in.GetBatchHeader()) if err != nil { - return nil, err + return nil, api.NewErrorInvalidArg(err.Error()) } blobs, err := node.GetBlobMessages(in.GetBlobs(), s.node.Config.NumBatchDeserializationWorkers) @@ -313,7 +313,7 @@ func (s *Server) rebuildMerkleTree(batchHeaderHash [32]byte) (*merkletree.Merkle return nil, err } - blobHeader, err := node.GetBlobHeaderFromProto(&protoBlobHeader) + blobHeader, err := core.BlobHeaderFromProto(&protoBlobHeader) if err != nil { return nil, err } @@ -355,7 +355,7 @@ func (s *Server) getBlobHeader(ctx context.Context, batchHeaderHash [32]byte, bl return nil, nil, err } - blobHeader, err := node.GetBlobHeaderFromProto(&protoBlobHeader) + blobHeader, err := core.BlobHeaderFromProto(&protoBlobHeader) if err != nil { return nil, nil, err } diff --git a/node/node.go b/node/node.go index a45097f246..c657d791bf 100644 --- a/node/node.go +++ b/node/node.go @@ -441,7 +441,7 @@ func (n *Node) ValidateBatchContents(ctx context.Context, blobHeaderHashes [][32 return errors.New("blob headers have different reference block numbers") } - blobHeader, err := GetBlobHeaderFromProto(&protoBlobHeader) + blobHeader, err := core.BlobHeaderFromProto(&protoBlobHeader) if err != nil { return fmt.Errorf("failed to get blob header from proto: %w", err) } diff --git a/node/store_test.go b/node/store_test.go index 29c92ae5df..af3c786e3b 100644 --- a/node/store_test.go +++ b/node/store_test.go @@ -475,7 +475,7 @@ func TestStoreBatchBlobMapping(t *testing.T) { assert.Nil(t, err) err = proto.Unmarshal(blobHeaderBytes0, &protoBlobHeader) assert.Nil(t, err) - blobHeader0, err := node.GetBlobHeaderFromProto(&protoBlobHeader) + blobHeader0, err := core.BlobHeaderFromProto(&protoBlobHeader) assert.Nil(t, err) assert.Equal(t, blobHeader0, blobs[0].BlobHeader) @@ -483,7 +483,7 @@ func TestStoreBatchBlobMapping(t *testing.T) { assert.Nil(t, err) err = proto.Unmarshal(blobHeaderBytes1, &protoBlobHeader) assert.Nil(t, err) - blobHeader1, err := node.GetBlobHeaderFromProto(&protoBlobHeader) + blobHeader1, err := core.BlobHeaderFromProto(&protoBlobHeader) assert.Nil(t, err) assert.Equal(t, blobHeader1, blobs[1].BlobHeader) blobHeaderBytes2, err := s.GetBlobHeader(ctx, batchHeaderHash, 2) diff --git a/node/utils.go b/node/utils.go index 2559dd2e80..6f2900317d 100644 --- a/node/utils.go +++ b/node/utils.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" - "github.com/Layr-Labs/eigenda/api" pb "github.com/Layr-Labs/eigenda/api/grpc/node" "github.com/Layr-Labs/eigenda/common/pubip" "github.com/Layr-Labs/eigenda/core" @@ -15,22 +14,6 @@ import ( "github.com/gammazero/workerpool" ) -// GetBatchHeader constructs a core.BatchHeader from a proto of pb.StoreChunksRequest. -// Note the StoreChunksRequest is validated as soon as it enters the node gRPC -// interface, see grpc.Server.validateStoreChunkRequest. -func GetBatchHeader(in *pb.BatchHeader) (*core.BatchHeader, error) { - if in == nil || len(in.GetBatchRoot()) == 0 { - return nil, api.NewErrorInvalidArg("batch header is nil or empty") - } - var batchRoot [32]byte - copy(batchRoot[:], in.GetBatchRoot()) - batchHeader := core.BatchHeader{ - ReferenceBlockNumber: uint(in.GetReferenceBlockNumber()), - BatchRoot: batchRoot, - } - return &batchHeader, nil -} - // GetBlobMessages constructs a core.BlobMessage array from blob protobufs. // Note the proto request is validated as soon as it enters the node gRPC // interface. This method assumes the blobs are valid. @@ -42,8 +25,7 @@ func GetBlobMessages(pbBlobs []*pb.Blob, numWorkers int) ([]*core.BlobMessage, e i := i blob := blob pool.Submit(func() { - blobHeader, err := GetBlobHeaderFromProto(blob.GetHeader()) - + blobHeader, err := core.BlobHeaderFromProto(blob.GetHeader()) if err != nil { resultChan <- err return @@ -139,80 +121,6 @@ func ValidatePointsFromBlobHeader(h *pb.BlobHeader) error { return nil } -// GetBlobHeaderFromProto constructs a core.BlobHeader from a proto of pb.BlobHeader. -func GetBlobHeaderFromProto(h *pb.BlobHeader) (*core.BlobHeader, error) { - - if h == nil { - return nil, api.NewErrorInvalidArg("GetBlobHeaderFromProto: blob header is nil") - - } - - commitX := new(fp.Element).SetBytes(h.GetCommitment().GetX()) - commitY := new(fp.Element).SetBytes(h.GetCommitment().GetY()) - commitment := &encoding.G1Commitment{ - X: *commitX, - Y: *commitY, - } - - if !(*bn254.G1Affine)(commitment).IsInSubGroup() { - return nil, errors.New("commitment is not in the subgroup") - } - - var lengthCommitment, lengthProof encoding.G2Commitment - if h.GetLengthCommitment() != nil { - lengthCommitment.X.A0 = *new(fp.Element).SetBytes(h.GetLengthCommitment().GetXA0()) - lengthCommitment.X.A1 = *new(fp.Element).SetBytes(h.GetLengthCommitment().GetXA1()) - lengthCommitment.Y.A0 = *new(fp.Element).SetBytes(h.GetLengthCommitment().GetYA0()) - lengthCommitment.Y.A1 = *new(fp.Element).SetBytes(h.GetLengthCommitment().GetYA1()) - } - - if !(*bn254.G2Affine)(&lengthCommitment).IsInSubGroup() { - return nil, errors.New("lengthCommitment is not in the subgroup") - } - - if h.GetLengthProof() != nil { - lengthProof.X.A0 = *new(fp.Element).SetBytes(h.GetLengthProof().GetXA0()) - lengthProof.X.A1 = *new(fp.Element).SetBytes(h.GetLengthProof().GetXA1()) - lengthProof.Y.A0 = *new(fp.Element).SetBytes(h.GetLengthProof().GetYA0()) - lengthProof.Y.A1 = *new(fp.Element).SetBytes(h.GetLengthProof().GetYA1()) - } - - if !(*bn254.G2Affine)(&lengthProof).IsInSubGroup() { - return nil, errors.New("lengthProof is not in the subgroup") - } - - quorumHeaders := make([]*core.BlobQuorumInfo, len(h.GetQuorumHeaders())) - for i, header := range h.GetQuorumHeaders() { - if header.GetQuorumId() > core.MaxQuorumID { - return nil, api.NewErrorInvalidArg(fmt.Sprintf("quorum ID must be in range [0, %d], but found %d", core.MaxQuorumID, header.GetQuorumId())) - } - if err := core.ValidateSecurityParam(header.GetConfirmationThreshold(), header.GetAdversaryThreshold()); err != nil { - return nil, err - } - - quorumHeaders[i] = &core.BlobQuorumInfo{ - SecurityParam: core.SecurityParam{ - QuorumID: core.QuorumID(header.GetQuorumId()), - AdversaryThreshold: uint8(header.GetAdversaryThreshold()), - ConfirmationThreshold: uint8(header.GetConfirmationThreshold()), - QuorumRate: header.GetRatelimit(), - }, - ChunkLength: uint(header.GetChunkLength()), - } - } - - return &core.BlobHeader{ - BlobCommitments: encoding.BlobCommitments{ - Commitment: commitment, - LengthCommitment: &lengthCommitment, - LengthProof: &lengthProof, - Length: uint(h.GetLength()), - }, - QuorumInfos: quorumHeaders, - AccountID: h.AccountId, - }, nil -} - func SocketAddress(ctx context.Context, provider pubip.Provider, dispersalPort string, retrievalPort string) (string, error) { ip, err := provider.PublicIPAddress(ctx) if err != nil { diff --git a/test/integration_test.go b/test/integration_test.go index 407e1effed..914fda9b84 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -648,7 +648,7 @@ func TestDispersalAndRetrieval(t *testing.T) { assert.Greater(t, headerReply.GetBlobHeader().GetQuorumHeaders()[0].GetChunkLength(), uint32(0)) if blobHeader == nil { - blobHeader, err = node.GetBlobHeaderFromProto(headerReply.GetBlobHeader()) + blobHeader, err = core.BlobHeaderFromProto(headerReply.GetBlobHeader()) assert.NoError(t, err) }