diff --git a/api/clients/mock/relay_client.go b/api/clients/mock/relay_client.go index 68f09b57a3..b268a430fe 100644 --- a/api/clients/mock/relay_client.go +++ b/api/clients/mock/relay_client.go @@ -20,11 +20,17 @@ func NewRelayClient() *MockRelayClient { func (c *MockRelayClient) GetBlob(ctx context.Context, relayKey corev2.RelayKey, blobKey corev2.BlobKey) ([]byte, error) { args := c.Called(blobKey) + if args.Get(0) == nil { + return nil, args.Error(1) + } return args.Get(0).([]byte), args.Error(1) } func (c *MockRelayClient) GetChunksByRange(ctx context.Context, relayKey corev2.RelayKey, requests []*clients.ChunkRequestByRange) ([][]byte, error) { - args := c.Called() + args := c.Called(ctx, relayKey, requests) + if args.Get(0) == nil { + return nil, args.Error(1) + } return args.Get(0).([][]byte), args.Error(1) } diff --git a/api/clients/node_client.go b/api/clients/node_client.go index bbf0a89a77..7166540ef5 100644 --- a/api/clients/node_client.go +++ b/api/clients/node_client.go @@ -63,7 +63,7 @@ func (c client) GetBlobHeader( return nil, nil, err } - blobHeader, err := core.BlobHeaderFromProto(reply.GetBlobHeader()) + blobHeader, err := core.BlobHeaderFromProtobuf(reply.GetBlobHeader()) if err != nil { return nil, nil, err } diff --git a/core/serialization.go b/core/serialization.go index 457879d9b6..00ae61f3d4 100644 --- a/core/serialization.go +++ b/core/serialization.go @@ -412,8 +412,8 @@ func BatchHeaderFromProtobuf(in *pb.BatchHeader) (*BatchHeader, error) { }, nil } -// BlobHeaderFromProto constructs a core.BlobHeader from a proto of pb.BlobHeader. -func BlobHeaderFromProto(h *pb.BlobHeader) (*BlobHeader, error) { +// BlobHeaderFromProtobuf constructs a core.BlobHeader from a proto of pb.BlobHeader. +func BlobHeaderFromProtobuf(h *pb.BlobHeader) (*BlobHeader, error) { if h == nil { return nil, fmt.Errorf("GetBlobHeaderFromProto: blob header is nil") diff --git a/core/v2/core_test.go b/core/v2/core_test.go index c119b30c75..83c26a52b9 100644 --- a/core/v2/core_test.go +++ b/core/v2/core_test.go @@ -182,14 +182,14 @@ func prepareBlobs( } if len(inverseMap[operatorID]) < blobIndex+1 { inverseMap[operatorID] = append(inverseMap[operatorID], &corev2.BlobShard{ - BlobCertificate: certs[blobIndex], - Chunks: make(map[core.QuorumID][]*encoding.Frame), + BlobCertificate: &certs[blobIndex], + Bundles: make(map[core.QuorumID]core.Bundle), }) } if len(frames) == 0 { continue } - inverseMap[operatorID][blobIndex].Chunks[quorum] = append(inverseMap[operatorID][blobIndex].Chunks[quorum], frames...) + inverseMap[operatorID][blobIndex].Bundles[quorum] = append(inverseMap[operatorID][blobIndex].Bundles[quorum], frames...) } } diff --git a/core/v2/types.go b/core/v2/types.go index 9e01049fa5..f8393fb200 100644 --- a/core/v2/types.go +++ b/core/v2/types.go @@ -211,6 +211,77 @@ type Batch struct { BlobCertificates []*BlobCertificate } +func (b *Batch) ToProtobuf() (*commonpb.Batch, error) { + if b.BatchHeader == nil { + return nil, errors.New("batch header is nil") + } + + if b.BatchHeader.BatchRoot == [32]byte{} { + return nil, errors.New("batch root is empty") + } + + if b.BatchHeader.ReferenceBlockNumber == 0 { + return nil, errors.New("reference block number is 0") + } + + blobCerts := make([]*commonpb.BlobCertificate, len(b.BlobCertificates)) + for i, cert := range b.BlobCertificates { + blobCert, err := cert.ToProtobuf() + if err != nil { + return nil, fmt.Errorf("failed to convert blob certificate to protobuf: %v", err) + } + blobCerts[i] = blobCert + } + + return &commonpb.Batch{ + Header: &commonpb.BatchHeader{ + BatchRoot: b.BatchHeader.BatchRoot[:], + ReferenceBlockNumber: b.BatchHeader.ReferenceBlockNumber, + }, + BlobCertificates: blobCerts, + }, nil +} + +func BatchFromProtobuf(proto *commonpb.Batch) (*Batch, error) { + if len(proto.GetBlobCertificates()) == 0 { + return nil, errors.New("missing blob certificates in batch") + } + + if proto.GetHeader() == nil { + return nil, errors.New("missing header in batch") + } + + if len(proto.GetHeader().GetBatchRoot()) != 32 { + return nil, errors.New("batch root must be 32 bytes") + } + + batchHeader := &BatchHeader{ + BatchRoot: [32]byte(proto.GetHeader().GetBatchRoot()), + ReferenceBlockNumber: proto.GetHeader().GetReferenceBlockNumber(), + } + + blobCerts := make([]*BlobCertificate, len(proto.GetBlobCertificates())) + for i, cert := range proto.GetBlobCertificates() { + blobHeader, err := NewBlobHeader(cert.GetBlobHeader()) + if err != nil { + return nil, fmt.Errorf("failed to create blob header: %v", err) + } + + blobCerts[i] = &BlobCertificate{ + BlobHeader: blobHeader, + RelayKeys: make([]RelayKey, len(cert.GetRelays())), + } + for j, r := range cert.GetRelays() { + blobCerts[i].RelayKeys[j] = RelayKey(r) + } + } + + return &Batch{ + BatchHeader: batchHeader, + BlobCertificates: blobCerts, + }, nil +} + type Attestation struct { *BatchHeader diff --git a/core/v2/types_test.go b/core/v2/types_test.go new file mode 100644 index 0000000000..38425d73d6 --- /dev/null +++ b/core/v2/types_test.go @@ -0,0 +1,67 @@ +package v2_test + +import ( + "math/big" + "testing" + + "github.com/Layr-Labs/eigenda/core" + v2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + "github.com/stretchr/testify/assert" +) + +func TestConvertBatchToFromProtobuf(t *testing.T) { + data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES) + commitments, err := p.GetCommitments(data) + if err != nil { + t.Fatal(err) + } + + bh0 := &v2.BlobHeader{ + BlobVersion: 0, + BlobCommitments: commitments, + QuorumNumbers: []core.QuorumID{0, 1}, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x123", + BinIndex: 5, + CumulativePayment: big.NewInt(100), + }, + Signature: []byte{1, 2, 3}, + } + bh1 := &v2.BlobHeader{ + BlobVersion: 0, + BlobCommitments: commitments, + QuorumNumbers: []core.QuorumID{0, 1}, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x456", + BinIndex: 6, + CumulativePayment: big.NewInt(200), + }, + Signature: []byte{1, 2, 3}, + } + + blobCert0 := &v2.BlobCertificate{ + BlobHeader: bh0, + RelayKeys: []v2.RelayKey{0, 1}, + } + blobCert1 := &v2.BlobCertificate{ + BlobHeader: bh1, + RelayKeys: []v2.RelayKey{2, 3}, + } + + batch := &v2.Batch{ + BatchHeader: &v2.BatchHeader{ + BatchRoot: [32]byte{1, 1, 1}, + ReferenceBlockNumber: 100, + }, + BlobCertificates: []*v2.BlobCertificate{blobCert0, blobCert1}, + } + + pb, err := batch.ToProtobuf() + assert.NoError(t, err) + + newBatch, err := v2.BatchFromProtobuf(pb) + assert.NoError(t, err) + + assert.Equal(t, batch, newBatch) +} diff --git a/core/v2/validator.go b/core/v2/validator.go index de0eab7de5..b2c7a2bedd 100644 --- a/core/v2/validator.go +++ b/core/v2/validator.go @@ -16,8 +16,8 @@ var ( ) type BlobShard struct { - BlobCertificate - Chunks map[core.QuorumID][]*encoding.Frame + *BlobCertificate + Bundles core.Bundles } // shardValidator implements the validation logic that a DA node should apply to its received data @@ -52,18 +52,17 @@ func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar if assignment.NumChunks == 0 { return nil, nil, fmt.Errorf("%w: operator %s has no chunks in quorum %d", ErrBlobQuorumSkip, v.operatorID.Hex(), quorum) } - if assignment.NumChunks != uint32(len(blob.Chunks[quorum])) { - return nil, nil, fmt.Errorf("number of chunks (%d) does not match assignment (%d) for quorum %d", len(blob.Chunks[quorum]), assignment.NumChunks, quorum) + if assignment.NumChunks != uint32(len(blob.Bundles[quorum])) { + return nil, nil, fmt.Errorf("number of chunks (%d) does not match assignment (%d) for quorum %d", len(blob.Bundles[quorum]), assignment.NumChunks, quorum) } - // Validate the chunkLength against the confirmation and adversary threshold parameters + // Get the chunk length chunkLength, err := GetChunkLength(blob.BlobHeader.BlobVersion, uint32(blob.BlobHeader.BlobCommitments.Length)) if err != nil { return nil, nil, fmt.Errorf("invalid chunk length: %w", err) } - // Get the chunk length - chunks := blob.Chunks[quorum] + chunks := blob.Bundles[quorum] for _, chunk := range chunks { if uint32(chunk.Length()) != chunkLength { return nil, nil, fmt.Errorf("%w: chunk length (%d) does not match quorum header (%d) for quorum %d", ErrChunkLengthMismatch, chunk.Length(), chunkLength, quorum) @@ -79,8 +78,8 @@ func (v *ShardValidator) ValidateBlobs(ctx context.Context, blobs []*BlobShard, blobCommitmentList := make([]encoding.BlobCommitments, len(blobs)) for k, blob := range blobs { - if len(blob.Chunks) != len(blob.BlobHeader.QuorumNumbers) { - return fmt.Errorf("number of bundles (%d) does not match number of quorums (%d)", len(blob.Chunks), len(blob.BlobHeader.QuorumNumbers)) + if len(blob.Bundles) != len(blob.BlobHeader.QuorumNumbers) { + return fmt.Errorf("number of bundles (%d) does not match number of quorums (%d)", len(blob.Bundles), len(blob.BlobHeader.QuorumNumbers)) } state, err := v.chainState.GetOperatorState(ctx, uint(referenceBlockNumber), blob.BlobHeader.QuorumNumbers) diff --git a/node/grpc/server.go b/node/grpc/server.go index db6b3bfbf2..23840de74e 100644 --- a/node/grpc/server.go +++ b/node/grpc/server.go @@ -313,7 +313,7 @@ func (s *Server) rebuildMerkleTree(batchHeaderHash [32]byte) (*merkletree.Merkle return nil, err } - blobHeader, err := core.BlobHeaderFromProto(&protoBlobHeader) + blobHeader, err := core.BlobHeaderFromProtobuf(&protoBlobHeader) if err != nil { return nil, err } @@ -355,7 +355,7 @@ func (s *Server) getBlobHeader(ctx context.Context, batchHeaderHash [32]byte, bl return nil, nil, err } - blobHeader, err := core.BlobHeaderFromProto(&protoBlobHeader) + blobHeader, err := core.BlobHeaderFromProtobuf(&protoBlobHeader) if err != nil { return nil, nil, err } diff --git a/node/grpc/server_v2.go b/node/grpc/server_v2.go index b2e02027a1..ccc9b187e2 100644 --- a/node/grpc/server_v2.go +++ b/node/grpc/server_v2.go @@ -3,7 +3,6 @@ package grpc import ( "context" "runtime" - "sync" "github.com/Layr-Labs/eigenda/api" pb "github.com/Layr-Labs/eigenda/api/grpc/node/v2" @@ -18,23 +17,24 @@ type ServerV2 struct { pb.UnimplementedDispersalServer pb.UnimplementedRetrievalServer - node *node.Node - config *node.Config - logger logging.Logger - + config *node.Config + node *node.Node ratelimiter common.RateLimiter - - mu *sync.Mutex + logger logging.Logger } // NewServerV2 creates a new Server instance with the provided parameters. -func NewServerV2(config *node.Config, node *node.Node, logger logging.Logger, ratelimiter common.RateLimiter) *ServerV2 { +func NewServerV2( + config *node.Config, + node *node.Node, + logger logging.Logger, + ratelimiter common.RateLimiter, +) *ServerV2 { return &ServerV2{ config: config, - logger: logger, node: node, ratelimiter: ratelimiter, - mu: &sync.Mutex{}, + logger: logger, } } diff --git a/node/node.go b/node/node.go index 4d9690632b..d2061ff2ae 100644 --- a/node/node.go +++ b/node/node.go @@ -22,6 +22,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/prometheus/client_golang/prometheus" + "github.com/Layr-Labs/eigenda/api/clients" "github.com/Layr-Labs/eigenda/api/grpc/node" "github.com/Layr-Labs/eigenda/common/geth" "github.com/Layr-Labs/eigenda/core" @@ -62,12 +63,19 @@ type Node struct { OperatorSocketsFilterer indexer.OperatorSocketsFilterer ChainID *big.Int + RelayClient clients.RelayClient + mu sync.Mutex CurrentSocket string } // NewNode creates a new Node with the provided config. -func NewNode(reg *prometheus.Registry, config *Config, pubIPProvider pubip.Provider, logger logging.Logger) (*Node, error) { +func NewNode( + reg *prometheus.Registry, + config *Config, + pubIPProvider pubip.Provider, + logger logging.Logger, +) (*Node, error) { // Setup metrics // sdkClients, err := buildSdkClients(config, logger) // if err != nil { @@ -160,6 +168,8 @@ func NewNode(reg *prometheus.Registry, config *Config, pubIPProvider pubip.Provi "quorumIDs", fmt.Sprint(config.QuorumIDList), "registerNodeAtStart", config.RegisterNodeAtStart, "pubIPCheckInterval", config.PubIPCheckInterval, "eigenDAServiceManagerAddr", config.EigenDAServiceManagerAddr, "blockStaleMeasure", blockStaleMeasure, "storeDurationBlocks", storeDurationBlocks, "enableGnarkBundleEncoding", config.EnableGnarkBundleEncoding) + var relayClient clients.RelayClient + // Create a new relay client with relay addresses onchain return &Node{ Config: config, Logger: nodeLogger, @@ -173,6 +183,7 @@ func NewNode(reg *prometheus.Registry, config *Config, pubIPProvider pubip.Provi PubIPProvider: pubIPProvider, OperatorSocketsFilterer: socketsFilterer, ChainID: chainID, + RelayClient: relayClient, }, nil } diff --git a/node/node_test.go b/node/node_test.go index 46ed967158..85ae43ab64 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -2,12 +2,12 @@ package node_test import ( "context" - "fmt" "os" "runtime" "testing" "time" + clientsmock "github.com/Layr-Labs/eigenda/api/clients/mock" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/common/geth" "github.com/Layr-Labs/eigenda/core" @@ -17,12 +17,15 @@ import ( "github.com/stretchr/testify/mock" ) -var privateKey = "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" -var opID = [32]byte{} +var ( + privateKey = "ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" + opID = [32]byte{0} +) type components struct { - node *node.Node - tx *coremock.MockWriter + node *node.Node + tx *coremock.MockWriter + relayClient *clientsmock.MockRelayClient } func newComponents(t *testing.T) *components { @@ -31,7 +34,6 @@ func newComponents(t *testing.T) *components { if err != nil { panic("failed to create a BLS Key") } - copy(opID[:], []byte(fmt.Sprintf("%d", 3))) config := &node.Config{ Timeout: 10 * time.Second, ExpirationPollIntervalSec: 1, @@ -69,19 +71,21 @@ func newComponents(t *testing.T) *components { panic("failed to create a new levelDB store") } defer os.Remove(dbPath) - + relayClient := clientsmock.NewRelayClient() return &components{ node: &node.Node{ - Config: config, - Logger: logger, - KeyPair: keyPair, - Metrics: nil, - Store: store, - ChainState: chainState, - Validator: mockVal, - Transactor: tx, + Config: config, + Logger: logger, + KeyPair: keyPair, + Metrics: nil, + Store: store, + ChainState: chainState, + Validator: mockVal, + Transactor: tx, + RelayClient: relayClient, }, - tx: tx, + tx: tx, + relayClient: relayClient, } } diff --git a/node/node_v2.go b/node/node_v2.go new file mode 100644 index 0000000000..bd1701fb4b --- /dev/null +++ b/node/node_v2.go @@ -0,0 +1,136 @@ +// These v2 methods are implemented in this separate file to keep the code organized. +// Note that there is no NodeV2 type and these methods are implemented in the existing Node type. + +package node + +import ( + "context" + "fmt" + "math/rand" + + "github.com/Layr-Labs/eigenda/api/clients" + "github.com/Layr-Labs/eigenda/core" + corev2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/gammazero/workerpool" +) + +type requestMetadata struct { + blobShardIndex int + quorum core.QuorumID +} +type relayRequest struct { + chunkRequests []*clients.ChunkRequestByRange + metadata []*requestMetadata +} +type response struct { + metadata []*requestMetadata + bundles [][]byte + err error +} + +type RawBundles struct { + BlobCertificate *corev2.BlobCertificate + Bundles map[core.QuorumID][]byte +} + +func (n *Node) DownloadBundles(ctx context.Context, batch *corev2.Batch) ([]*corev2.BlobShard, []*RawBundles, error) { + if n.RelayClient == nil { + return nil, nil, fmt.Errorf("relay client is not set") + } + + operatorState, err := n.ChainState.GetOperatorStateByOperator(ctx, uint(batch.BatchHeader.ReferenceBlockNumber), n.Config.ID) + if err != nil { + return nil, nil, err + } + + blobShards := make([]*corev2.BlobShard, len(batch.BlobCertificates)) + rawBundles := make([]*RawBundles, len(batch.BlobCertificates)) + requests := make(map[corev2.RelayKey]*relayRequest) + for i, cert := range batch.BlobCertificates { + blobKey, err := cert.BlobHeader.BlobKey() + if err != nil { + return nil, nil, fmt.Errorf("failed to get blob key: %v", err) + } + + if len(cert.RelayKeys) == 0 { + return nil, nil, fmt.Errorf("no relay keys in the certificate") + } + blobShards[i] = &corev2.BlobShard{ + BlobCertificate: cert, + Bundles: make(map[core.QuorumID]core.Bundle), + } + rawBundles[i] = &RawBundles{ + BlobCertificate: cert, + Bundles: make(map[core.QuorumID][]byte), + } + relayIndex := rand.Intn(len(cert.RelayKeys)) + relayKey := cert.RelayKeys[relayIndex] + for _, quorum := range cert.BlobHeader.QuorumNumbers { + assgn, err := corev2.GetAssignment(operatorState, batch.BlobCertificates[0].BlobHeader.BlobVersion, quorum, n.Config.ID) + if err != nil { + return nil, nil, fmt.Errorf("failed to get assignments: %v", err) + } + + req, ok := requests[relayKey] + if !ok { + req = &relayRequest{ + chunkRequests: make([]*clients.ChunkRequestByRange, 0), + metadata: make([]*requestMetadata, 0), + } + requests[relayKey] = req + } + // Chunks from one blob are requested to the same relay + req.chunkRequests = append(req.chunkRequests, &clients.ChunkRequestByRange{ + BlobKey: blobKey, + Start: assgn.StartIndex, + End: assgn.StartIndex + assgn.NumChunks, + }) + req.metadata = append(req.metadata, &requestMetadata{ + blobShardIndex: i, + quorum: quorum, + }) + } + } + + pool := workerpool.New(len(requests)) + bundleChan := make(chan response, len(requests)) + for relayKey := range requests { + relayKey := relayKey + req := requests[relayKey] + pool.Submit(func() { + bundles, err := n.RelayClient.GetChunksByRange(ctx, relayKey, req.chunkRequests) + if err != nil { + n.Logger.Errorf("failed to get chunks from relays: %v", err) + bundleChan <- response{ + metadata: nil, + bundles: nil, + err: err, + } + return + } + bundleChan <- response{ + metadata: req.metadata, + bundles: bundles, + err: nil, + } + }) + } + pool.StopWait() + + for i := 0; i < len(requests); i++ { + resp := <-bundleChan + if resp.err != nil { + return nil, nil, fmt.Errorf("failed to get chunks from relays: %v", resp.err) + } + for i, bundle := range resp.bundles { + metadata := resp.metadata[i] + blobShards[metadata.blobShardIndex].Bundles[metadata.quorum], err = new(core.Bundle).Deserialize(bundle) + if err != nil { + return nil, nil, fmt.Errorf("failed to deserialize bundle: %v", err) + } + rawBundles[metadata.blobShardIndex].Bundles[metadata.quorum] = bundle + } + } + + return blobShards, rawBundles, nil +} diff --git a/node/node_v2_test.go b/node/node_v2_test.go new file mode 100644 index 0000000000..36d6e60ce2 --- /dev/null +++ b/node/node_v2_test.go @@ -0,0 +1,316 @@ +package node_test + +import ( + "context" + "fmt" + "math/big" + "testing" + + "github.com/Layr-Labs/eigenda/api/clients" + "github.com/Layr-Labs/eigenda/core" + v2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/encoding" + "github.com/consensys/gnark-crypto/ecc/bn254" + "github.com/consensys/gnark-crypto/ecc/bn254/fp" + "github.com/consensys/gnark-crypto/ecc/bn254/fr" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func mockBatch(t *testing.T) ([]v2.BlobKey, *v2.Batch, []map[core.QuorumID]core.Bundle) { + commitments := mockCommitment(t) + bh0 := &v2.BlobHeader{ + BlobVersion: 0, + BlobCommitments: commitments, + QuorumNumbers: []core.QuorumID{0, 1}, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x123", + BinIndex: 5, + CumulativePayment: big.NewInt(100), + }, + Signature: []byte{1, 2, 3}, + } + bh1 := &v2.BlobHeader{ + BlobVersion: 0, + BlobCommitments: commitments, + QuorumNumbers: []core.QuorumID{0, 1}, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x456", + BinIndex: 6, + CumulativePayment: big.NewInt(200), + }, + Signature: []byte{1, 2, 3}, + } + bh2 := &v2.BlobHeader{ + BlobVersion: 0, + BlobCommitments: commitments, + QuorumNumbers: []core.QuorumID{1, 2}, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x789", + BinIndex: 7, + CumulativePayment: big.NewInt(300), + }, + Signature: []byte{1, 2, 3}, + } + blobKey0, err := bh0.BlobKey() + require.NoError(t, err) + blobKey1, err := bh1.BlobKey() + require.NoError(t, err) + blobKey2, err := bh2.BlobKey() + require.NoError(t, err) + + // blobCert 0 and blobCert 2 will be downloaded from relay 0 + // blobCert 1 will be downloaded from relay 1 + blobCert0 := &v2.BlobCertificate{ + BlobHeader: bh0, + RelayKeys: []v2.RelayKey{0}, + } + blobCert1 := &v2.BlobCertificate{ + BlobHeader: bh1, + RelayKeys: []v2.RelayKey{1}, + } + blobCert2 := &v2.BlobCertificate{ + BlobHeader: bh2, + RelayKeys: []v2.RelayKey{0}, + } + + bundles0 := map[core.QuorumID]core.Bundle{ + 0: { + { + Proof: encoding.Proof(*core.NewG1Point(big.NewInt(1), big.NewInt(2)).G1Affine), + Coeffs: []fr.Element{ + {1, 2, 3, 4}, + {5, 6, 7, 8}, + }, + }, + }, + 1: { + { + Proof: encoding.Proof(*core.NewG1Point(big.NewInt(3), big.NewInt(4)).G1Affine), + Coeffs: []fr.Element{ + {9, 10, 11, 12}, + {13, 14, 15, 16}, + }, + }, + { + Proof: encoding.Proof(*core.NewG1Point(big.NewInt(5), big.NewInt(6)).G1Affine), + Coeffs: []fr.Element{ + {17, 18, 19, 20}, + {21, 22, 23, 24}, + }, + }, + }, + } + bundles1 := map[core.QuorumID]core.Bundle{ + 0: { + { + Proof: encoding.Proof(*core.NewG1Point(big.NewInt(7), big.NewInt(8)).G1Affine), + Coeffs: []fr.Element{ + {25, 26, 27, 28}, + {29, 30, 31, 32}, + }, + }, + { + Proof: encoding.Proof(*core.NewG1Point(big.NewInt(9), big.NewInt(10)).G1Affine), + Coeffs: []fr.Element{ + {33, 34, 35, 36}, + {37, 38, 39, 40}, + }, + }, + }, + 1: { + { + Proof: encoding.Proof(*core.NewG1Point(big.NewInt(11), big.NewInt(12)).G1Affine), + Coeffs: []fr.Element{ + {41, 42, 43, 44}, + {45, 46, 47, 48}, + }, + }, + }, + } + bundles2 := map[core.QuorumID]core.Bundle{ + 1: { + { + Proof: encoding.Proof(*core.NewG1Point(big.NewInt(13), big.NewInt(14)).G1Affine), + Coeffs: []fr.Element{ + {49, 50, 51, 52}, + {53, 54, 55, 56}, + }, + }, + { + Proof: encoding.Proof(*core.NewG1Point(big.NewInt(15), big.NewInt(16)).G1Affine), + Coeffs: []fr.Element{ + {57, 58, 59, 60}, + {61, 62, 63, 64}, + }, + }, + }, + 2: { + { + Proof: encoding.Proof(*core.NewG1Point(big.NewInt(17), big.NewInt(18)).G1Affine), + Coeffs: []fr.Element{ + {65, 66, 67, 68}, + {69, 70, 71, 72}, + }, + }, + }, + } + + return []v2.BlobKey{blobKey0, blobKey1, blobKey2}, &v2.Batch{ + BatchHeader: &v2.BatchHeader{ + BatchRoot: [32]byte{1, 1, 1}, + ReferenceBlockNumber: 100, + }, + BlobCertificates: []*v2.BlobCertificate{blobCert0, blobCert1, blobCert2}, + }, []map[core.QuorumID]core.Bundle{bundles0, bundles1, bundles2} +} + +func TestDownloadBundles(t *testing.T) { + c := newComponents(t) + ctx := context.Background() + blobKeys, batch, bundles := mockBatch(t) + blobCerts := batch.BlobCertificates + + bundles00Bytes, err := bundles[0][0].Serialize() + require.NoError(t, err) + bundles01Bytes, err := bundles[0][1].Serialize() + require.NoError(t, err) + bundles10Bytes, err := bundles[1][0].Serialize() + require.NoError(t, err) + bundles11Bytes, err := bundles[1][1].Serialize() + require.NoError(t, err) + bundles21Bytes, err := bundles[2][1].Serialize() + require.NoError(t, err) + bundles22Bytes, err := bundles[2][2].Serialize() + require.NoError(t, err) + c.relayClient.On("GetChunksByRange", mock.Anything, v2.RelayKey(0), mock.Anything).Return([][]byte{bundles00Bytes, bundles01Bytes, bundles21Bytes, bundles22Bytes}, nil).Run(func(args mock.Arguments) { + requests := args.Get(2).([]*clients.ChunkRequestByRange) + require.Len(t, requests, 4) + require.Equal(t, blobKeys[0], requests[0].BlobKey) + require.Equal(t, blobKeys[0], requests[1].BlobKey) + require.Equal(t, blobKeys[2], requests[2].BlobKey) + require.Equal(t, blobKeys[2], requests[3].BlobKey) + }) + c.relayClient.On("GetChunksByRange", mock.Anything, v2.RelayKey(1), mock.Anything).Return([][]byte{bundles10Bytes, bundles11Bytes}, nil).Run(func(args mock.Arguments) { + requests := args.Get(2).([]*clients.ChunkRequestByRange) + require.Len(t, requests, 2) + require.Equal(t, blobKeys[1], requests[0].BlobKey) + require.Equal(t, blobKeys[1], requests[1].BlobKey) + }) + blobShards, rawBundles, err := c.node.DownloadBundles(ctx, batch) + require.NoError(t, err) + require.Len(t, blobShards, 3) + require.Equal(t, blobCerts[0], blobShards[0].BlobCertificate) + require.Equal(t, blobCerts[1], blobShards[1].BlobCertificate) + require.Equal(t, blobCerts[2], blobShards[2].BlobCertificate) + require.Contains(t, blobShards[0].Bundles, core.QuorumID(0)) + require.Contains(t, blobShards[0].Bundles, core.QuorumID(1)) + require.Contains(t, blobShards[1].Bundles, core.QuorumID(0)) + require.Contains(t, blobShards[1].Bundles, core.QuorumID(1)) + require.Contains(t, blobShards[2].Bundles, core.QuorumID(1)) + require.Contains(t, blobShards[2].Bundles, core.QuorumID(2)) + bundleEqual(t, bundles[0][0], blobShards[0].Bundles[0]) + bundleEqual(t, bundles[0][1], blobShards[0].Bundles[1]) + bundleEqual(t, bundles[1][0], blobShards[1].Bundles[0]) + bundleEqual(t, bundles[1][1], blobShards[1].Bundles[1]) + bundleEqual(t, bundles[2][1], blobShards[2].Bundles[1]) + bundleEqual(t, bundles[2][2], blobShards[2].Bundles[2]) + + require.Len(t, rawBundles, 3) + require.Equal(t, blobCerts[0], rawBundles[0].BlobCertificate) + require.Equal(t, blobCerts[1], rawBundles[1].BlobCertificate) + require.Equal(t, blobCerts[2], rawBundles[2].BlobCertificate) + require.Contains(t, rawBundles[0].Bundles, core.QuorumID(0)) + require.Contains(t, rawBundles[0].Bundles, core.QuorumID(1)) + require.Contains(t, rawBundles[1].Bundles, core.QuorumID(0)) + require.Contains(t, rawBundles[1].Bundles, core.QuorumID(1)) + require.Contains(t, rawBundles[2].Bundles, core.QuorumID(1)) + require.Contains(t, rawBundles[2].Bundles, core.QuorumID(2)) + + require.Equal(t, bundles00Bytes, rawBundles[0].Bundles[0]) + require.Equal(t, bundles01Bytes, rawBundles[0].Bundles[1]) + require.Equal(t, bundles10Bytes, rawBundles[1].Bundles[0]) + require.Equal(t, bundles11Bytes, rawBundles[1].Bundles[1]) + require.Equal(t, bundles21Bytes, rawBundles[2].Bundles[1]) + require.Equal(t, bundles22Bytes, rawBundles[2].Bundles[2]) +} + +func TestDownloadBundlesFail(t *testing.T) { + c := newComponents(t) + ctx := context.Background() + blobKeys, batch, bundles := mockBatch(t) + + bundles00Bytes, err := bundles[0][0].Serialize() + require.NoError(t, err) + bundles01Bytes, err := bundles[0][1].Serialize() + require.NoError(t, err) + bundles21Bytes, err := bundles[2][1].Serialize() + require.NoError(t, err) + bundles22Bytes, err := bundles[2][2].Serialize() + require.NoError(t, err) + c.relayClient.On("GetChunksByRange", mock.Anything, v2.RelayKey(0), mock.Anything).Return([][]byte{bundles00Bytes, bundles01Bytes, bundles21Bytes, bundles22Bytes}, nil).Run(func(args mock.Arguments) { + requests := args.Get(2).([]*clients.ChunkRequestByRange) + require.Len(t, requests, 4) + require.Equal(t, blobKeys[0], requests[0].BlobKey) + require.Equal(t, blobKeys[0], requests[1].BlobKey) + require.Equal(t, blobKeys[2], requests[2].BlobKey) + require.Equal(t, blobKeys[2], requests[3].BlobKey) + }) + relayServerError := fmt.Errorf("relay server error") + c.relayClient.On("GetChunksByRange", mock.Anything, v2.RelayKey(1), mock.Anything).Return(nil, relayServerError).Run(func(args mock.Arguments) { + requests := args.Get(2).([]*clients.ChunkRequestByRange) + require.Len(t, requests, 2) + require.Equal(t, blobKeys[1], requests[0].BlobKey) + require.Equal(t, blobKeys[1], requests[1].BlobKey) + }) + + blobShards, rawBundles, err := c.node.DownloadBundles(ctx, batch) + require.Error(t, err) + require.Nil(t, blobShards) + require.Nil(t, rawBundles) +} + +func mockCommitment(t *testing.T) encoding.BlobCommitments { + var X1, Y1 fp.Element + X1 = *X1.SetBigInt(big.NewInt(1)) + Y1 = *Y1.SetBigInt(big.NewInt(2)) + + var lengthXA0, lengthXA1, lengthYA0, lengthYA1 fp.Element + _, err := lengthXA0.SetString("10857046999023057135944570762232829481370756359578518086990519993285655852781") + require.NoError(t, err) + _, err = lengthXA1.SetString("11559732032986387107991004021392285783925812861821192530917403151452391805634") + require.NoError(t, err) + _, err = lengthYA0.SetString("8495653923123431417604973247489272438418190587263600148770280649306958101930") + require.NoError(t, err) + _, err = lengthYA1.SetString("4082367875863433681332203403145435568316851327593401208105741076214120093531") + require.NoError(t, err) + + var lengthProof, lengthCommitment bn254.G2Affine + lengthProof.X.A0 = lengthXA0 + lengthProof.X.A1 = lengthXA1 + lengthProof.Y.A0 = lengthYA0 + lengthProof.Y.A1 = lengthYA1 + + lengthCommitment = lengthProof + + return encoding.BlobCommitments{ + Commitment: &encoding.G1Commitment{ + X: X1, + Y: Y1, + }, + LengthCommitment: (*encoding.G2Commitment)(&lengthCommitment), + LengthProof: (*encoding.G2Commitment)(&lengthProof), + Length: 10, + } +} + +func bundleEqual(t *testing.T, expected, actual core.Bundle) { + for i := range expected { + frameEqual(t, expected[i], actual[i]) + } +} + +func frameEqual(t *testing.T, expected, actual *encoding.Frame) { + require.Equal(t, expected.Proof.Bytes(), actual.Proof.Bytes()) + require.Equal(t, expected.Coeffs, actual.Coeffs) +} diff --git a/node/utils.go b/node/utils.go index 6f2900317d..b2bd8b8d55 100644 --- a/node/utils.go +++ b/node/utils.go @@ -25,7 +25,7 @@ func GetBlobMessages(pbBlobs []*pb.Blob, numWorkers int) ([]*core.BlobMessage, e i := i blob := blob pool.Submit(func() { - blobHeader, err := core.BlobHeaderFromProto(blob.GetHeader()) + blobHeader, err := core.BlobHeaderFromProtobuf(blob.GetHeader()) if err != nil { resultChan <- err return diff --git a/test/integration_test.go b/test/integration_test.go index 914fda9b84..71c6ced61e 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -648,7 +648,7 @@ func TestDispersalAndRetrieval(t *testing.T) { assert.Greater(t, headerReply.GetBlobHeader().GetQuorumHeaders()[0].GetChunkLength(), uint32(0)) if blobHeader == nil { - blobHeader, err = core.BlobHeaderFromProto(headerReply.GetBlobHeader()) + blobHeader, err = core.BlobHeaderFromProtobuf(headerReply.GetBlobHeader()) assert.NoError(t, err) }