Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] Node DownloadBatch method #880

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion api/clients/mock/relay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@ func NewRelayClient() *MockRelayClient {

func (c *MockRelayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error) {
args := c.Called(blobKey)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).([]byte), args.Error(1)
}

func (c *MockRelayClient) GetChunksByRange(ctx context.Context, relayKey corev2.RelayKey, requests []*clients.ChunkRequestByRange) ([][]byte, error) {
args := c.Called()
args := c.Called(ctx, relayKey, requests)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).([][]byte), args.Error(1)
}

Expand Down
2 changes: 1 addition & 1 deletion api/clients/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (c client) GetBlobHeader(
return nil, nil, err
}

blobHeader, err := core.BlobHeaderFromProto(reply.GetBlobHeader())
blobHeader, err := core.BlobHeaderFromProtobuf(reply.GetBlobHeader())
if err != nil {
return nil, nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions core/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,8 @@ func BatchHeaderFromProtobuf(in *pb.BatchHeader) (*BatchHeader, error) {
}, nil
}

// BlobHeaderFromProto constructs a core.BlobHeader from a proto of pb.BlobHeader.
func BlobHeaderFromProto(h *pb.BlobHeader) (*BlobHeader, error) {
// BlobHeaderFromProtobuf constructs a core.BlobHeader from a proto of pb.BlobHeader.
func BlobHeaderFromProtobuf(h *pb.BlobHeader) (*BlobHeader, error) {
if h == nil {
return nil, fmt.Errorf("GetBlobHeaderFromProto: blob header is nil")

Expand Down
6 changes: 3 additions & 3 deletions core/v2/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@ func prepareBlobs(
}
if len(inverseMap[operatorID]) < blobIndex+1 {
inverseMap[operatorID] = append(inverseMap[operatorID], &corev2.BlobShard{
BlobCertificate: certs[blobIndex],
Chunks: make(map[core.QuorumID][]*encoding.Frame),
BlobCertificate: &certs[blobIndex],
Bundles: make(map[core.QuorumID]core.Bundle),
})
}
if len(frames) == 0 {
continue
}
inverseMap[operatorID][blobIndex].Chunks[quorum] = append(inverseMap[operatorID][blobIndex].Chunks[quorum], frames...)
inverseMap[operatorID][blobIndex].Bundles[quorum] = append(inverseMap[operatorID][blobIndex].Bundles[quorum], frames...)

}
}
Expand Down
71 changes: 71 additions & 0 deletions core/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,77 @@ type Batch struct {
BlobCertificates []*BlobCertificate
}

func (b *Batch) ToProtobuf() (*commonpb.Batch, error) {
if b.BatchHeader == nil {
return nil, errors.New("batch header is nil")
}

if b.BatchHeader.BatchRoot == [32]byte{} {
return nil, errors.New("batch root is empty")
}

if b.BatchHeader.ReferenceBlockNumber == 0 {
return nil, errors.New("reference block number is 0")
}

blobCerts := make([]*commonpb.BlobCertificate, len(b.BlobCertificates))
for i, cert := range b.BlobCertificates {
blobCert, err := cert.ToProtobuf()
if err != nil {
return nil, fmt.Errorf("failed to convert blob certificate to protobuf: %v", err)
}
blobCerts[i] = blobCert
}

return &commonpb.Batch{
Header: &commonpb.BatchHeader{
BatchRoot: b.BatchHeader.BatchRoot[:],
ReferenceBlockNumber: b.BatchHeader.ReferenceBlockNumber,
},
BlobCertificates: blobCerts,
}, nil
}

func BatchFromProtobuf(proto *commonpb.Batch) (*Batch, error) {
if len(proto.GetBlobCertificates()) == 0 {
return nil, errors.New("missing blob certificates in batch")
}

if proto.GetHeader() == nil {
return nil, errors.New("missing header in batch")
}

if len(proto.GetHeader().GetBatchRoot()) != 32 {
return nil, errors.New("batch root must be 32 bytes")
}

batchHeader := &BatchHeader{
BatchRoot: [32]byte(proto.GetHeader().GetBatchRoot()),
ReferenceBlockNumber: proto.GetHeader().GetReferenceBlockNumber(),
}

blobCerts := make([]*BlobCertificate, len(proto.GetBlobCertificates()))
for i, cert := range proto.GetBlobCertificates() {
blobHeader, err := NewBlobHeader(cert.GetBlobHeader())
if err != nil {
return nil, fmt.Errorf("failed to create blob header: %v", err)
}

blobCerts[i] = &BlobCertificate{
BlobHeader: blobHeader,
RelayKeys: make([]RelayKey, len(cert.GetRelays())),
}
for j, r := range cert.GetRelays() {
blobCerts[i].RelayKeys[j] = RelayKey(r)
}
}

return &Batch{
BatchHeader: batchHeader,
BlobCertificates: blobCerts,
}, nil
}

type Attestation struct {
*BatchHeader

Expand Down
67 changes: 67 additions & 0 deletions core/v2/types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package v2_test

import (
"math/big"
"testing"

"github.com/Layr-Labs/eigenda/core"
v2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/encoding/utils/codec"
"github.com/stretchr/testify/assert"
)

func TestConvertBatchToFromProtobuf(t *testing.T) {
data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES)
commitments, err := p.GetCommitments(data)
if err != nil {
t.Fatal(err)
}

bh0 := &v2.BlobHeader{
BlobVersion: 0,
BlobCommitments: commitments,
QuorumNumbers: []core.QuorumID{0, 1},
PaymentMetadata: core.PaymentMetadata{
AccountID: "0x123",
BinIndex: 5,
CumulativePayment: big.NewInt(100),
},
Signature: []byte{1, 2, 3},
}
bh1 := &v2.BlobHeader{
BlobVersion: 0,
BlobCommitments: commitments,
QuorumNumbers: []core.QuorumID{0, 1},
PaymentMetadata: core.PaymentMetadata{
AccountID: "0x456",
BinIndex: 6,
CumulativePayment: big.NewInt(200),
},
Signature: []byte{1, 2, 3},
}

blobCert0 := &v2.BlobCertificate{
BlobHeader: bh0,
RelayKeys: []v2.RelayKey{0, 1},
}
blobCert1 := &v2.BlobCertificate{
BlobHeader: bh1,
RelayKeys: []v2.RelayKey{2, 3},
}

batch := &v2.Batch{
BatchHeader: &v2.BatchHeader{
BatchRoot: [32]byte{1, 1, 1},
ReferenceBlockNumber: 100,
},
BlobCertificates: []*v2.BlobCertificate{blobCert0, blobCert1},
}

pb, err := batch.ToProtobuf()
assert.NoError(t, err)

newBatch, err := v2.BatchFromProtobuf(pb)
assert.NoError(t, err)

assert.Equal(t, batch, newBatch)
}
17 changes: 8 additions & 9 deletions core/v2/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ var (
)

type BlobShard struct {
BlobCertificate
Chunks map[core.QuorumID][]*encoding.Frame
*BlobCertificate
Bundles core.Bundles
}

// shardValidator implements the validation logic that a DA node should apply to its received data
Expand Down Expand Up @@ -52,18 +52,17 @@ func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar
if assignment.NumChunks == 0 {
return nil, nil, fmt.Errorf("%w: operator %s has no chunks in quorum %d", ErrBlobQuorumSkip, v.operatorID.Hex(), quorum)
}
if assignment.NumChunks != uint32(len(blob.Chunks[quorum])) {
return nil, nil, fmt.Errorf("number of chunks (%d) does not match assignment (%d) for quorum %d", len(blob.Chunks[quorum]), assignment.NumChunks, quorum)
if assignment.NumChunks != uint32(len(blob.Bundles[quorum])) {
return nil, nil, fmt.Errorf("number of chunks (%d) does not match assignment (%d) for quorum %d", len(blob.Bundles[quorum]), assignment.NumChunks, quorum)
}

// Validate the chunkLength against the confirmation and adversary threshold parameters
// Get the chunk length
chunkLength, err := GetChunkLength(blob.BlobHeader.BlobVersion, uint32(blob.BlobHeader.BlobCommitments.Length))
if err != nil {
return nil, nil, fmt.Errorf("invalid chunk length: %w", err)
}

// Get the chunk length
chunks := blob.Chunks[quorum]
chunks := blob.Bundles[quorum]
for _, chunk := range chunks {
if uint32(chunk.Length()) != chunkLength {
return nil, nil, fmt.Errorf("%w: chunk length (%d) does not match quorum header (%d) for quorum %d", ErrChunkLengthMismatch, chunk.Length(), chunkLength, quorum)
Expand All @@ -79,8 +78,8 @@ func (v *ShardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard,
blobCommitmentList := make([]encoding.BlobCommitments, len(blobs))

for k, blob := range blobs {
if len(blob.Chunks) != len(blob.BlobHeader.QuorumNumbers) {
return fmt.Errorf("number of bundles (%d) does not match number of quorums (%d)", len(blob.Chunks), len(blob.BlobHeader.QuorumNumbers))
if len(blob.Bundles) != len(blob.BlobHeader.QuorumNumbers) {
return fmt.Errorf("number of bundles (%d) does not match number of quorums (%d)", len(blob.Bundles), len(blob.BlobHeader.QuorumNumbers))
}

state, err := v.chainState.GetOperatorState(ctx, uint(referenceBlockNumber), blob.BlobHeader.QuorumNumbers)
Expand Down
4 changes: 2 additions & 2 deletions node/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (s *Server) rebuildMerkleTree(batchHeaderHash [32]byte) (*merkletree.Merkle
return nil, err
}

blobHeader, err := core.BlobHeaderFromProto(&protoBlobHeader)
blobHeader, err := core.BlobHeaderFromProtobuf(&protoBlobHeader)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -355,7 +355,7 @@ func (s *Server) getBlobHeader(ctx context.Context, batchHeaderHash [32]byte, bl
return nil, nil, err
}

blobHeader, err := core.BlobHeaderFromProto(&protoBlobHeader)
blobHeader, err := core.BlobHeaderFromProtobuf(&protoBlobHeader)
if err != nil {
return nil, nil, err
}
Expand Down
20 changes: 10 additions & 10 deletions node/grpc/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package grpc
import (
"context"
"runtime"
"sync"

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/node/v2"
Expand All @@ -18,23 +17,24 @@ type ServerV2 struct {
pb.UnimplementedDispersalServer
pb.UnimplementedRetrievalServer

node *node.Node
config *node.Config
logger logging.Logger

config *node.Config
node *node.Node
ratelimiter common.RateLimiter

mu *sync.Mutex
logger logging.Logger
}

// NewServerV2 creates a new Server instance with the provided parameters.
func NewServerV2(config *node.Config, node *node.Node, logger logging.Logger, ratelimiter common.RateLimiter) *ServerV2 {
func NewServerV2(
config *node.Config,
node *node.Node,
logger logging.Logger,
ratelimiter common.RateLimiter,
) *ServerV2 {
return &ServerV2{
config: config,
logger: logger,
node: node,
ratelimiter: ratelimiter,
mu: &sync.Mutex{},
logger: logger,
}
}

Expand Down
13 changes: 12 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/prometheus/client_golang/prometheus"

"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/Layr-Labs/eigenda/common/geth"
"github.com/Layr-Labs/eigenda/core"
Expand Down Expand Up @@ -62,12 +63,19 @@ type Node struct {
OperatorSocketsFilterer indexer.OperatorSocketsFilterer
ChainID *big.Int

RelayClient clients.RelayClient

mu sync.Mutex
CurrentSocket string
}

// NewNode creates a new Node with the provided config.
func NewNode(reg *prometheus.Registry, config *Config, pubIPProvider pubip.Provider, logger logging.Logger) (*Node, error) {
func NewNode(
reg *prometheus.Registry,
config *Config,
pubIPProvider pubip.Provider,
logger logging.Logger,
) (*Node, error) {
// Setup metrics
// sdkClients, err := buildSdkClients(config, logger)
// if err != nil {
Expand Down Expand Up @@ -160,6 +168,8 @@ func NewNode(reg *prometheus.Registry, config *Config, pubIPProvider pubip.Provi
"quorumIDs", fmt.Sprint(config.QuorumIDList), "registerNodeAtStart", config.RegisterNodeAtStart, "pubIPCheckInterval", config.PubIPCheckInterval,
"eigenDAServiceManagerAddr", config.EigenDAServiceManagerAddr, "blockStaleMeasure", blockStaleMeasure, "storeDurationBlocks", storeDurationBlocks, "enableGnarkBundleEncoding", config.EnableGnarkBundleEncoding)

var relayClient clients.RelayClient
// Create a new relay client with relay addresses onchain
return &Node{
Config: config,
Logger: nodeLogger,
Expand All @@ -173,6 +183,7 @@ func NewNode(reg *prometheus.Registry, config *Config, pubIPProvider pubip.Provi
PubIPProvider: pubIPProvider,
OperatorSocketsFilterer: socketsFilterer,
ChainID: chainID,
RelayClient: relayClient,
}, nil
}

Expand Down
Loading
Loading