Skip to content

Commit

Permalink
[v2] Node DownloadBatch method (#880)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Nov 13, 2024
1 parent 685031a commit 72357a9
Show file tree
Hide file tree
Showing 15 changed files with 657 additions and 47 deletions.
8 changes: 7 additions & 1 deletion api/clients/mock/relay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@ func NewRelayClient() *MockRelayClient {

func (c *MockRelayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error) {
args := c.Called(blobKey)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).([]byte), args.Error(1)
}

func (c *MockRelayClient) GetChunksByRange(ctx context.Context, relayKey corev2.RelayKey, requests []*clients.ChunkRequestByRange) ([][]byte, error) {
args := c.Called()
args := c.Called(ctx, relayKey, requests)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).([][]byte), args.Error(1)
}

Expand Down
2 changes: 1 addition & 1 deletion api/clients/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (c client) GetBlobHeader(
return nil, nil, err
}

blobHeader, err := core.BlobHeaderFromProto(reply.GetBlobHeader())
blobHeader, err := core.BlobHeaderFromProtobuf(reply.GetBlobHeader())
if err != nil {
return nil, nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions core/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,8 @@ func BatchHeaderFromProtobuf(in *pb.BatchHeader) (*BatchHeader, error) {
}, nil
}

// BlobHeaderFromProto constructs a core.BlobHeader from a proto of pb.BlobHeader.
func BlobHeaderFromProto(h *pb.BlobHeader) (*BlobHeader, error) {
// BlobHeaderFromProtobuf constructs a core.BlobHeader from a proto of pb.BlobHeader.
func BlobHeaderFromProtobuf(h *pb.BlobHeader) (*BlobHeader, error) {
if h == nil {
return nil, fmt.Errorf("GetBlobHeaderFromProto: blob header is nil")

Expand Down
6 changes: 3 additions & 3 deletions core/v2/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@ func prepareBlobs(
}
if len(inverseMap[operatorID]) < blobIndex+1 {
inverseMap[operatorID] = append(inverseMap[operatorID], &corev2.BlobShard{
BlobCertificate: certs[blobIndex],
Chunks: make(map[core.QuorumID][]*encoding.Frame),
BlobCertificate: &certs[blobIndex],
Bundles: make(map[core.QuorumID]core.Bundle),
})
}
if len(frames) == 0 {
continue
}
inverseMap[operatorID][blobIndex].Chunks[quorum] = append(inverseMap[operatorID][blobIndex].Chunks[quorum], frames...)
inverseMap[operatorID][blobIndex].Bundles[quorum] = append(inverseMap[operatorID][blobIndex].Bundles[quorum], frames...)

}
}
Expand Down
71 changes: 71 additions & 0 deletions core/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,77 @@ type Batch struct {
BlobCertificates []*BlobCertificate
}

func (b *Batch) ToProtobuf() (*commonpb.Batch, error) {
if b.BatchHeader == nil {
return nil, errors.New("batch header is nil")
}

if b.BatchHeader.BatchRoot == [32]byte{} {
return nil, errors.New("batch root is empty")
}

if b.BatchHeader.ReferenceBlockNumber == 0 {
return nil, errors.New("reference block number is 0")
}

blobCerts := make([]*commonpb.BlobCertificate, len(b.BlobCertificates))
for i, cert := range b.BlobCertificates {
blobCert, err := cert.ToProtobuf()
if err != nil {
return nil, fmt.Errorf("failed to convert blob certificate to protobuf: %v", err)
}
blobCerts[i] = blobCert
}

return &commonpb.Batch{
Header: &commonpb.BatchHeader{
BatchRoot: b.BatchHeader.BatchRoot[:],
ReferenceBlockNumber: b.BatchHeader.ReferenceBlockNumber,
},
BlobCertificates: blobCerts,
}, nil
}

func BatchFromProtobuf(proto *commonpb.Batch) (*Batch, error) {
if len(proto.GetBlobCertificates()) == 0 {
return nil, errors.New("missing blob certificates in batch")
}

if proto.GetHeader() == nil {
return nil, errors.New("missing header in batch")
}

if len(proto.GetHeader().GetBatchRoot()) != 32 {
return nil, errors.New("batch root must be 32 bytes")
}

batchHeader := &BatchHeader{
BatchRoot: [32]byte(proto.GetHeader().GetBatchRoot()),
ReferenceBlockNumber: proto.GetHeader().GetReferenceBlockNumber(),
}

blobCerts := make([]*BlobCertificate, len(proto.GetBlobCertificates()))
for i, cert := range proto.GetBlobCertificates() {
blobHeader, err := NewBlobHeader(cert.GetBlobHeader())
if err != nil {
return nil, fmt.Errorf("failed to create blob header: %v", err)
}

blobCerts[i] = &BlobCertificate{
BlobHeader: blobHeader,
RelayKeys: make([]RelayKey, len(cert.GetRelays())),
}
for j, r := range cert.GetRelays() {
blobCerts[i].RelayKeys[j] = RelayKey(r)
}
}

return &Batch{
BatchHeader: batchHeader,
BlobCertificates: blobCerts,
}, nil
}

type Attestation struct {
*BatchHeader

Expand Down
67 changes: 67 additions & 0 deletions core/v2/types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package v2_test

import (
"math/big"
"testing"

"github.com/Layr-Labs/eigenda/core"
v2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/encoding/utils/codec"
"github.com/stretchr/testify/assert"
)

func TestConvertBatchToFromProtobuf(t *testing.T) {
data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES)
commitments, err := p.GetCommitments(data)
if err != nil {
t.Fatal(err)
}

bh0 := &v2.BlobHeader{
BlobVersion: 0,
BlobCommitments: commitments,
QuorumNumbers: []core.QuorumID{0, 1},
PaymentMetadata: core.PaymentMetadata{
AccountID: "0x123",
BinIndex: 5,
CumulativePayment: big.NewInt(100),
},
Signature: []byte{1, 2, 3},
}
bh1 := &v2.BlobHeader{
BlobVersion: 0,
BlobCommitments: commitments,
QuorumNumbers: []core.QuorumID{0, 1},
PaymentMetadata: core.PaymentMetadata{
AccountID: "0x456",
BinIndex: 6,
CumulativePayment: big.NewInt(200),
},
Signature: []byte{1, 2, 3},
}

blobCert0 := &v2.BlobCertificate{
BlobHeader: bh0,
RelayKeys: []v2.RelayKey{0, 1},
}
blobCert1 := &v2.BlobCertificate{
BlobHeader: bh1,
RelayKeys: []v2.RelayKey{2, 3},
}

batch := &v2.Batch{
BatchHeader: &v2.BatchHeader{
BatchRoot: [32]byte{1, 1, 1},
ReferenceBlockNumber: 100,
},
BlobCertificates: []*v2.BlobCertificate{blobCert0, blobCert1},
}

pb, err := batch.ToProtobuf()
assert.NoError(t, err)

newBatch, err := v2.BatchFromProtobuf(pb)
assert.NoError(t, err)

assert.Equal(t, batch, newBatch)
}
17 changes: 8 additions & 9 deletions core/v2/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ var (
)

type BlobShard struct {
BlobCertificate
Chunks map[core.QuorumID][]*encoding.Frame
*BlobCertificate
Bundles core.Bundles
}

// shardValidator implements the validation logic that a DA node should apply to its received data
Expand Down Expand Up @@ -52,18 +52,17 @@ func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar
if assignment.NumChunks == 0 {
return nil, nil, fmt.Errorf("%w: operator %s has no chunks in quorum %d", ErrBlobQuorumSkip, v.operatorID.Hex(), quorum)
}
if assignment.NumChunks != uint32(len(blob.Chunks[quorum])) {
return nil, nil, fmt.Errorf("number of chunks (%d) does not match assignment (%d) for quorum %d", len(blob.Chunks[quorum]), assignment.NumChunks, quorum)
if assignment.NumChunks != uint32(len(blob.Bundles[quorum])) {
return nil, nil, fmt.Errorf("number of chunks (%d) does not match assignment (%d) for quorum %d", len(blob.Bundles[quorum]), assignment.NumChunks, quorum)
}

// Validate the chunkLength against the confirmation and adversary threshold parameters
// Get the chunk length
chunkLength, err := GetChunkLength(blob.BlobHeader.BlobVersion, uint32(blob.BlobHeader.BlobCommitments.Length))
if err != nil {
return nil, nil, fmt.Errorf("invalid chunk length: %w", err)
}

// Get the chunk length
chunks := blob.Chunks[quorum]
chunks := blob.Bundles[quorum]
for _, chunk := range chunks {
if uint32(chunk.Length()) != chunkLength {
return nil, nil, fmt.Errorf("%w: chunk length (%d) does not match quorum header (%d) for quorum %d", ErrChunkLengthMismatch, chunk.Length(), chunkLength, quorum)
Expand All @@ -79,8 +78,8 @@ func (v *ShardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard,
blobCommitmentList := make([]encoding.BlobCommitments, len(blobs))

for k, blob := range blobs {
if len(blob.Chunks) != len(blob.BlobHeader.QuorumNumbers) {
return fmt.Errorf("number of bundles (%d) does not match number of quorums (%d)", len(blob.Chunks), len(blob.BlobHeader.QuorumNumbers))
if len(blob.Bundles) != len(blob.BlobHeader.QuorumNumbers) {
return fmt.Errorf("number of bundles (%d) does not match number of quorums (%d)", len(blob.Bundles), len(blob.BlobHeader.QuorumNumbers))
}

state, err := v.chainState.GetOperatorState(ctx, uint(referenceBlockNumber), blob.BlobHeader.QuorumNumbers)
Expand Down
4 changes: 2 additions & 2 deletions node/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (s *Server) rebuildMerkleTree(batchHeaderHash [32]byte) (*merkletree.Merkle
return nil, err
}

blobHeader, err := core.BlobHeaderFromProto(&protoBlobHeader)
blobHeader, err := core.BlobHeaderFromProtobuf(&protoBlobHeader)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -355,7 +355,7 @@ func (s *Server) getBlobHeader(ctx context.Context, batchHeaderHash [32]byte, bl
return nil, nil, err
}

blobHeader, err := core.BlobHeaderFromProto(&protoBlobHeader)
blobHeader, err := core.BlobHeaderFromProtobuf(&protoBlobHeader)
if err != nil {
return nil, nil, err
}
Expand Down
20 changes: 10 additions & 10 deletions node/grpc/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package grpc
import (
"context"
"runtime"
"sync"

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/node/v2"
Expand All @@ -18,23 +17,24 @@ type ServerV2 struct {
pb.UnimplementedDispersalServer
pb.UnimplementedRetrievalServer

node *node.Node
config *node.Config
logger logging.Logger

config *node.Config
node *node.Node
ratelimiter common.RateLimiter

mu *sync.Mutex
logger logging.Logger
}

// NewServerV2 creates a new Server instance with the provided parameters.
func NewServerV2(config *node.Config, node *node.Node, logger logging.Logger, ratelimiter common.RateLimiter) *ServerV2 {
func NewServerV2(
config *node.Config,
node *node.Node,
logger logging.Logger,
ratelimiter common.RateLimiter,
) *ServerV2 {
return &ServerV2{
config: config,
logger: logger,
node: node,
ratelimiter: ratelimiter,
mu: &sync.Mutex{},
logger: logger,
}
}

Expand Down
13 changes: 12 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/prometheus/client_golang/prometheus"

"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/Layr-Labs/eigenda/common/geth"
"github.com/Layr-Labs/eigenda/core"
Expand Down Expand Up @@ -62,12 +63,19 @@ type Node struct {
OperatorSocketsFilterer indexer.OperatorSocketsFilterer
ChainID *big.Int

RelayClient clients.RelayClient

mu sync.Mutex
CurrentSocket string
}

// NewNode creates a new Node with the provided config.
func NewNode(reg *prometheus.Registry, config *Config, pubIPProvider pubip.Provider, logger logging.Logger) (*Node, error) {
func NewNode(
reg *prometheus.Registry,
config *Config,
pubIPProvider pubip.Provider,
logger logging.Logger,
) (*Node, error) {
// Setup metrics
// sdkClients, err := buildSdkClients(config, logger)
// if err != nil {
Expand Down Expand Up @@ -160,6 +168,8 @@ func NewNode(reg *prometheus.Registry, config *Config, pubIPProvider pubip.Provi
"quorumIDs", fmt.Sprint(config.QuorumIDList), "registerNodeAtStart", config.RegisterNodeAtStart, "pubIPCheckInterval", config.PubIPCheckInterval,
"eigenDAServiceManagerAddr", config.EigenDAServiceManagerAddr, "blockStaleMeasure", blockStaleMeasure, "storeDurationBlocks", storeDurationBlocks, "enableGnarkBundleEncoding", config.EnableGnarkBundleEncoding)

var relayClient clients.RelayClient
// Create a new relay client with relay addresses onchain
return &Node{
Config: config,
Logger: nodeLogger,
Expand All @@ -173,6 +183,7 @@ func NewNode(reg *prometheus.Registry, config *Config, pubIPProvider pubip.Provi
PubIPProvider: pubIPProvider,
OperatorSocketsFilterer: socketsFilterer,
ChainID: chainID,
RelayClient: relayClient,
}, nil
}

Expand Down
Loading

0 comments on commit 72357a9

Please sign in to comment.