From 03f712af7c0b9bcc863d3d778de82ac6d71515a0 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Tue, 13 Aug 2024 21:52:03 +0000 Subject: [PATCH 01/19] [1/N][zero serialization] Make Batcher operate on chunks without deserialization --- core/data.go | 4 +++ disperser/batcher/encoded_blob_store.go | 4 +-- disperser/batcher/encoding_streamer.go | 22 +++++++----- disperser/batcher/encoding_streamer_test.go | 2 +- disperser/batcher/grpc/dispatcher.go | 39 +++++++++++++++------ disperser/encoder/client.go | 18 +++++----- disperser/encoder_client.go | 3 +- disperser/local_encoder_client.go | 19 ++++++++-- disperser/mock/encoder.go | 7 ++-- 9 files changed, 80 insertions(+), 38 deletions(-) diff --git a/core/data.go b/core/data.go index 7829ef3ad6..39325f2721 100644 --- a/core/data.go +++ b/core/data.go @@ -266,6 +266,8 @@ type BatchHeader struct { type EncodedBlob struct { BlobHeader *BlobHeader BundlesByOperator map[OperatorID]Bundles + // EncodedBundlesByOperator is bundles in encoded format (not deserialized) + EncodedBundlesByOperator map[OperatorID]map[QuorumID]*ChunksData } // A Bundle is the collection of chunks associated with a single blob, for a single operator and a single quorum. @@ -278,6 +280,8 @@ type Bundles map[QuorumID]Bundle type BlobMessage struct { BlobHeader *BlobHeader Bundles Bundles + // EncodedBundles is bundles in encoded format (not deserialized) + EncodedBundles map[QuorumID]*ChunksData } func (b Bundle) Size() uint64 { diff --git a/disperser/batcher/encoded_blob_store.go b/disperser/batcher/encoded_blob_store.go index 5ce98d6d71..fcf8baa21d 100644 --- a/disperser/batcher/encoded_blob_store.go +++ b/disperser/batcher/encoded_blob_store.go @@ -29,7 +29,7 @@ type EncodingResult struct { ReferenceBlockNumber uint BlobQuorumInfo *core.BlobQuorumInfo Commitment *encoding.BlobCommitments - Chunks []*encoding.Frame + ChunksData *core.ChunksData Assignments map[core.OperatorID]core.Assignment } @@ -197,5 +197,5 @@ func getRequestID(key disperser.BlobKey, quorumID core.QuorumID) requestID { // getChunksSize returns the total size of all the chunks in the encoded result in bytes func getChunksSize(result *EncodingResult) uint64 { - return core.Bundle(result.Chunks).Size() + return result.ChunksData.Size() } diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index 3b22a3f829..5a0fac572c 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -386,7 +386,7 @@ func (e *EncodingStreamer) RequestEncodingForBlob(ctx context.Context, metadata ReferenceBlockNumber: referenceBlockNumber, BlobQuorumInfo: res.BlobQuorumInfo, Commitment: commits, - Chunks: chunks, + ChunksData: chunks, Assignments: res.Assignments, }, Err: nil, @@ -489,12 +489,14 @@ func (e *EncodingStreamer) CreateMinibatch(ctx context.Context) (*batch, error) // Populate the assigned bundles for opID, assignment := range result.Assignments { - bundles, ok := encodedBlobByKey[blobKey].BundlesByOperator[opID] + bundles, ok := encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] if !ok { - encodedBlobByKey[blobKey].BundlesByOperator[opID] = make(core.Bundles) - bundles = encodedBlobByKey[blobKey].BundlesByOperator[opID] + encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] = make(map[core.QuorumID]*core.ChunksData) + bundles = encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] } - bundles[result.BlobQuorumInfo.QuorumID] = append(bundles[result.BlobQuorumInfo.QuorumID], result.Chunks[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...) + bundles[result.BlobQuorumInfo.QuorumID].Format = result.ChunksData.Format + bundles[result.BlobQuorumInfo.QuorumID].Chunks = append(bundles[result.BlobQuorumInfo.QuorumID].Chunks, result.ChunksData.Chunks[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...) + bundles[result.BlobQuorumInfo.QuorumID].ChunkLen = result.ChunksData.ChunkLen } blobQuorums[blobKey] = append(blobQuorums[blobKey], result.BlobQuorumInfo) @@ -638,12 +640,14 @@ func (e *EncodingStreamer) CreateBatch(ctx context.Context) (*batch, error) { // Populate the assigned bundles for opID, assignment := range result.Assignments { - bundles, ok := encodedBlobByKey[blobKey].BundlesByOperator[opID] + bundles, ok := encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] if !ok { - encodedBlobByKey[blobKey].BundlesByOperator[opID] = make(core.Bundles) - bundles = encodedBlobByKey[blobKey].BundlesByOperator[opID] + encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] = make(map[core.QuorumID]*core.ChunksData) + bundles = encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] } - bundles[result.BlobQuorumInfo.QuorumID] = append(bundles[result.BlobQuorumInfo.QuorumID], result.Chunks[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...) + bundles[result.BlobQuorumInfo.QuorumID].Format = result.ChunksData.Format + bundles[result.BlobQuorumInfo.QuorumID].Chunks = append(bundles[result.BlobQuorumInfo.QuorumID].Chunks, result.ChunksData.Chunks[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...) + bundles[result.BlobQuorumInfo.QuorumID].ChunkLen = result.ChunksData.ChunkLen } blobQuorums[blobKey] = append(blobQuorums[blobKey], result.BlobQuorumInfo) diff --git a/disperser/batcher/encoding_streamer_test.go b/disperser/batcher/encoding_streamer_test.go index d84ca03492..688a832f11 100644 --- a/disperser/batcher/encoding_streamer_test.go +++ b/disperser/batcher/encoding_streamer_test.go @@ -246,7 +246,7 @@ func TestStreamingEncoding(t *testing.T) { assert.NotNil(t, encodedResult.Commitment.LengthProof) assert.Greater(t, encodedResult.Commitment.Length, uint(0)) assert.Len(t, encodedResult.Assignments, numOperators) - assert.Len(t, encodedResult.Chunks, 32) + assert.Len(t, encodedResult.ChunksData, 32) isRequested = encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey, core.QuorumID(0), 10) assert.True(t, isRequested) count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize() diff --git a/disperser/batcher/grpc/dispatcher.go b/disperser/batcher/grpc/dispatcher.go index 1ea3375444..3ae364874a 100644 --- a/disperser/batcher/grpc/dispatcher.go +++ b/disperser/batcher/grpc/dispatcher.go @@ -65,7 +65,7 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera blobMessages = append(blobMessages, &core.BlobMessage{ BlobHeader: blob.BlobHeader, // Bundles will be empty if the operator is not in the quorums blob is dispersed on - Bundles: blob.BundlesByOperator[id], + EncodedBundles: blob.EncodedBundlesByOperator[id], }) } if !hasAnyBundles { @@ -289,7 +289,7 @@ func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.B if err != nil { return nil, 0, err } - totalSize += int64(blob.Bundles.Size()) + totalSize += getBundlesSize(blob) } request := &node.StoreChunksRequest{ @@ -309,7 +309,7 @@ func GetStoreBlobsRequest(blobMessages []*core.BlobMessage, batchHeader *core.Ba if err != nil { return nil, 0, err } - totalSize += int64(blob.Bundles.Size()) + totalSize += getBundlesSize(blob) } request := &node.StoreBlobsRequest{ @@ -357,13 +357,20 @@ func getBlobMessage(blob *core.BlobMessage, useGnarkBundleEncoding bool) (*node. } } + var err error bundles := make([]*node.Bundle, len(quorumHeaders)) if useGnarkBundleEncoding { // the ordering of quorums in bundles must be same as in quorumHeaders for i, quorumHeader := range quorumHeaders { quorum := quorumHeader.QuorumId - if bundle, ok := blob.Bundles[uint8(quorum)]; ok { - bundleBytes, err := bundle.Serialize() + if chunksData, ok := blob.EncodedBundles[uint8(quorum)]; ok { + if chunksData.Format != core.GnarkChunkEncodingFormat { + chunksData, err = chunksData.ToGnarkFormat() + if err != nil { + return nil, err + } + } + bundleBytes, err := chunksData.FlattenToBundle() if err != nil { return nil, err } @@ -378,16 +385,18 @@ func getBlobMessage(blob *core.BlobMessage, useGnarkBundleEncoding bool) (*node. } } } else { - data, err := blob.Bundles.Serialize() - if err != nil { - return nil, err - } // the ordering of quorums in bundles must be same as in quorumHeaders for i, quorumHeader := range quorumHeaders { quorum := quorumHeader.QuorumId - if _, ok := blob.Bundles[uint8(quorum)]; ok { + if chunksData, ok := blob.EncodedBundles[uint8(quorum)]; ok { + if chunksData.Format != core.GobChunkEncodingFormat { + chunksData, err = chunksData.ToGobFormat() + if err != nil { + return nil, err + } + } bundles[i] = &node.Bundle{ - Chunks: data[quorum], + Chunks: chunksData.Chunks, } } else { bundles[i] = &node.Bundle{ @@ -417,3 +426,11 @@ func getBatchHeaderMessage(header *core.BatchHeader) *node.BatchHeader { ReferenceBlockNumber: uint32(header.ReferenceBlockNumber), } } + +func getBundlesSize(blob *core.BlobMessage) int64 { + size := int64(0) + for _, bundle := range blob.EncodedBundles { + size += int64(bundle.Size()) + } + return size +} diff --git a/disperser/encoder/client.go b/disperser/encoder/client.go index 6b3858a63b..8a72b08c85 100644 --- a/disperser/encoder/client.go +++ b/disperser/encoder/client.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/disperser" pb "github.com/Layr-Labs/eigenda/disperser/api/grpc/encoder" "github.com/Layr-Labs/eigenda/encoding" @@ -24,7 +25,7 @@ func NewEncoderClient(addr string, timeout time.Duration) (disperser.EncoderClie }, nil } -func (c client) EncodeBlob(ctx context.Context, data []byte, encodingParams encoding.EncodingParams) (*encoding.BlobCommitments, []*encoding.Frame, error) { +func (c client) EncodeBlob(ctx context.Context, data []byte, encodingParams encoding.EncodingParams) (*encoding.BlobCommitments, *core.ChunksData, error) { conn, err := grpc.Dial( c.addr, grpc.WithTransportCredentials(insecure.NewCredentials()), @@ -59,18 +60,17 @@ func (c client) EncodeBlob(ctx context.Context, data []byte, encodingParams enco if err != nil { return nil, nil, err } - chunks := make([]*encoding.Frame, len(reply.GetChunks())) - for i, chunk := range reply.GetChunks() { - deserialized, err := new(encoding.Frame).Deserialize(chunk) - if err != nil { - return nil, nil, err - } - chunks[i] = deserialized + chunksData := &core.ChunksData{ + Chunks: reply.GetChunks(), + // TODO(jianoaix): plumb the encoding format for the encoder server. For now it's fine + // as it's hard coded using Gob at Encoder server. + Format: core.GobChunkEncodingFormat, + ChunkLen: int(encodingParams.ChunkLength), } return &encoding.BlobCommitments{ Commitment: commitment, LengthCommitment: lengthCommitment, LengthProof: lengthProof, Length: uint(reply.GetCommitment().GetLength()), - }, chunks, nil + }, chunksData, nil } diff --git a/disperser/encoder_client.go b/disperser/encoder_client.go index 20857af9cd..daffd35136 100644 --- a/disperser/encoder_client.go +++ b/disperser/encoder_client.go @@ -3,9 +3,10 @@ package disperser import ( "context" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/encoding" ) type EncoderClient interface { - EncodeBlob(ctx context.Context, data []byte, encodingParams encoding.EncodingParams) (*encoding.BlobCommitments, []*encoding.Frame, error) + EncodeBlob(ctx context.Context, data []byte, encodingParams encoding.EncodingParams) (*encoding.BlobCommitments, *core.ChunksData, error) } diff --git a/disperser/local_encoder_client.go b/disperser/local_encoder_client.go index b66cf79dfd..7fabf27aa6 100644 --- a/disperser/local_encoder_client.go +++ b/disperser/local_encoder_client.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/encoding" ) @@ -21,7 +22,7 @@ func NewLocalEncoderClient(prover encoding.Prover) *LocalEncoderClient { } } -func (m *LocalEncoderClient) EncodeBlob(ctx context.Context, data []byte, encodingParams encoding.EncodingParams) (*encoding.BlobCommitments, []*encoding.Frame, error) { +func (m *LocalEncoderClient) EncodeBlob(ctx context.Context, data []byte, encodingParams encoding.EncodingParams) (*encoding.BlobCommitments, *core.ChunksData, error) { m.mu.Lock() defer m.mu.Unlock() commits, chunks, err := m.prover.EncodeAndProve(data, encodingParams) @@ -29,5 +30,19 @@ func (m *LocalEncoderClient) EncodeBlob(ctx context.Context, data []byte, encodi return nil, nil, err } - return &commits, chunks, nil + bytes := make([][]byte, len(chunks)) + for _, c := range chunks { + serialized, err := c.Serialize() + if err != nil { + return nil, nil, err + } + bytes = append(bytes, serialized) + } + chunksData := &core.ChunksData{ + Chunks: bytes, + Format: core.GobChunkEncodingFormat, + ChunkLen: int(encodingParams.ChunkLength), + } + + return &commits, chunksData, nil } diff --git a/disperser/mock/encoder.go b/disperser/mock/encoder.go index 0aa6422434..c9d4d1babb 100644 --- a/disperser/mock/encoder.go +++ b/disperser/mock/encoder.go @@ -3,6 +3,7 @@ package mock import ( "context" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/encoding" "github.com/stretchr/testify/mock" @@ -18,15 +19,15 @@ func NewMockEncoderClient() *MockEncoderClient { return &MockEncoderClient{} } -func (m *MockEncoderClient) EncodeBlob(ctx context.Context, data []byte, encodingParams encoding.EncodingParams) (*encoding.BlobCommitments, []*encoding.Frame, error) { +func (m *MockEncoderClient) EncodeBlob(ctx context.Context, data []byte, encodingParams encoding.EncodingParams) (*encoding.BlobCommitments, *core.ChunksData, error) { args := m.Called(ctx, data, encodingParams) var commitments *encoding.BlobCommitments if args.Get(0) != nil { commitments = args.Get(0).(*encoding.BlobCommitments) } - var chunks []*encoding.Frame + var chunks *core.ChunksData if args.Get(1) != nil { - chunks = args.Get(1).([]*encoding.Frame) + chunks = args.Get(1).(*core.ChunksData) } return commitments, chunks, args.Error(2) } From 072bdedc45373517c591feaf447f3a130e9f5ad7 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Tue, 13 Aug 2024 23:31:16 +0000 Subject: [PATCH 02/19] EncodedBlobMessage --- core/data.go | 8 ++++++++ disperser/batcher/grpc/dispatcher.go | 16 ++++++++-------- disperser/batcher/minibatcher.go | 6 +++--- disperser/disperser.go | 2 +- disperser/mock/dispatcher.go | 2 +- node/grpc/server_load_test.go | 23 ++++++++++++++++------- 6 files changed, 37 insertions(+), 20 deletions(-) diff --git a/core/data.go b/core/data.go index 39325f2721..102099c985 100644 --- a/core/data.go +++ b/core/data.go @@ -284,6 +284,14 @@ type BlobMessage struct { EncodedBundles map[QuorumID]*ChunksData } +// This is similar to BlobMessage, but keep the commitments and chunks in encoded format +// (i.e. not deserialized) +type EncodedBlobMessage struct { + // TODO(jianoaix): Change the commitments to encoded format. + BlobHeader *BlobHeader + EncodedBundles map[QuorumID]*ChunksData +} + func (b Bundle) Size() uint64 { size := uint64(0) for _, chunk := range b { diff --git a/disperser/batcher/grpc/dispatcher.go b/disperser/batcher/grpc/dispatcher.go index 3ae364874a..b429e2450c 100644 --- a/disperser/batcher/grpc/dispatcher.go +++ b/disperser/batcher/grpc/dispatcher.go @@ -52,7 +52,7 @@ func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOpera func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, update chan core.SigningMessage) { for id, op := range state.IndexedOperators { go func(op core.IndexedOperatorInfo, id core.OperatorID) { - blobMessages := make([]*core.BlobMessage, 0) + blobMessages := make([]*core.EncodedBlobMessage, 0) hasAnyBundles := false batchHeaderHash, err := batchHeader.GetBatchHeaderHash() if err != nil { @@ -62,7 +62,7 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera if _, ok := blob.BundlesByOperator[id]; ok { hasAnyBundles = true } - blobMessages = append(blobMessages, &core.BlobMessage{ + blobMessages = append(blobMessages, &core.EncodedBlobMessage{ BlobHeader: blob.BlobHeader, // Bundles will be empty if the operator is not in the quorums blob is dispersed on EncodedBundles: blob.EncodedBundlesByOperator[id], @@ -111,7 +111,7 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera } } -func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) { +func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) { // TODO Add secure Grpc conn, err := grpc.Dial( @@ -152,7 +152,7 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage, // SendBlobsToOperator sends blobs to an operator via the node's StoreBlobs endpoint // It returns the signatures of the blobs sent to the operator in the same order as the blobs // with nil values for blobs that were not attested by the operator -func (c *dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) { +func (c *dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) { // TODO Add secure Grpc conn, err := grpc.Dial( @@ -280,7 +280,7 @@ func (c *dispatcher) SendAttestBatchRequest(ctx context.Context, nodeDispersalCl return &core.Signature{G1Point: point}, nil } -func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader, useGnarkBundleEncoding bool) (*node.StoreChunksRequest, int64, error) { +func GetStoreChunksRequest(blobMessages []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, useGnarkBundleEncoding bool) (*node.StoreChunksRequest, int64, error) { blobs := make([]*node.Blob, len(blobMessages)) totalSize := int64(0) for i, blob := range blobMessages { @@ -300,7 +300,7 @@ func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.B return request, totalSize, nil } -func GetStoreBlobsRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader, useGnarkBundleEncoding bool) (*node.StoreBlobsRequest, int64, error) { +func GetStoreBlobsRequest(blobMessages []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, useGnarkBundleEncoding bool) (*node.StoreBlobsRequest, int64, error) { blobs := make([]*node.Blob, len(blobMessages)) totalSize := int64(0) for i, blob := range blobMessages { @@ -320,7 +320,7 @@ func GetStoreBlobsRequest(blobMessages []*core.BlobMessage, batchHeader *core.Ba return request, totalSize, nil } -func getBlobMessage(blob *core.BlobMessage, useGnarkBundleEncoding bool) (*node.Blob, error) { +func getBlobMessage(blob *core.EncodedBlobMessage, useGnarkBundleEncoding bool) (*node.Blob, error) { if blob.BlobHeader == nil { return nil, errors.New("blob header is nil") } @@ -427,7 +427,7 @@ func getBatchHeaderMessage(header *core.BatchHeader) *node.BatchHeader { } } -func getBundlesSize(blob *core.BlobMessage) int64 { +func getBundlesSize(blob *core.EncodedBlobMessage) int64 { size := int64(0) for _, bundle := range blob.EncodedBundles { size += int64(bundle.Size()) diff --git a/disperser/batcher/minibatcher.go b/disperser/batcher/minibatcher.go index dfd13419d6..d828960ac4 100644 --- a/disperser/batcher/minibatcher.go +++ b/disperser/batcher/minibatcher.go @@ -314,16 +314,16 @@ func (b *Minibatcher) SendBlobsToOperatorWithRetries( opID core.OperatorID, maxNumRetries int, ) ([]*core.Signature, error) { - blobMessages := make([]*core.BlobMessage, 0) + blobMessages := make([]*core.EncodedBlobMessage, 0) hasAnyBundles := false for _, blob := range blobs { if _, ok := blob.BundlesByOperator[opID]; ok { hasAnyBundles = true } - blobMessages = append(blobMessages, &core.BlobMessage{ + blobMessages = append(blobMessages, &core.EncodedBlobMessage{ BlobHeader: blob.BlobHeader, // Bundles will be empty if the operator is not in the quorums blob is dispersed on - Bundles: blob.BundlesByOperator[opID], + EncodedBundles: blob.EncodedBundlesByOperator[opID], }) } if !hasAnyBundles { diff --git a/disperser/disperser.go b/disperser/disperser.go index 8603b4abfa..efee6fea3a 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -178,7 +178,7 @@ type BlobStore interface { type Dispatcher interface { DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SigningMessage - SendBlobsToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) + SendBlobsToOperator(ctx context.Context, blobs []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) AttestBatch(ctx context.Context, state *core.IndexedOperatorState, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader) (chan core.SigningMessage, error) SendAttestBatchRequest(ctx context.Context, nodeDispersalClient node.DispersalClient, blobHeaderHashes [][32]byte, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) } diff --git a/disperser/mock/dispatcher.go b/disperser/mock/dispatcher.go index 743de8ebd9..e17dbd0d15 100644 --- a/disperser/mock/dispatcher.go +++ b/disperser/mock/dispatcher.go @@ -66,7 +66,7 @@ func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOpera return update } -func (d *Dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.BlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) { +func (d *Dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) ([]*core.Signature, error) { args := d.Called(ctx, blobs, batchHeader, op) if args.Get(0) == nil { return nil, args.Error(1) diff --git a/node/grpc/server_load_test.go b/node/grpc/server_load_test.go index a0ab6bd72d..c216c0269b 100644 --- a/node/grpc/server_load_test.go +++ b/node/grpc/server_load_test.go @@ -15,14 +15,14 @@ import ( "github.com/stretchr/testify/assert" ) -func makeBatch(t *testing.T, blobSize int, numBlobs int, advThreshold, quorumThreshold int, refBlockNumber uint) (*core.BatchHeader, map[core.OperatorID][]*core.BlobMessage) { +func makeBatch(t *testing.T, blobSize int, numBlobs int, advThreshold, quorumThreshold int, refBlockNumber uint) (*core.BatchHeader, map[core.OperatorID][]*core.EncodedBlobMessage) { p, _, err := makeTestComponents() assert.NoError(t, err) asn := &core.StdAssignmentCoordinator{} blobHeaders := make([]*core.BlobHeader, numBlobs) blobChunks := make([][]*encoding.Frame, numBlobs) - blobMessagesByOp := make(map[core.OperatorID][]*core.BlobMessage) + blobMessagesByOp := make(map[core.OperatorID][]*core.EncodedBlobMessage) for i := 0; i < numBlobs; i++ { // create data ranData := make([]byte, blobSize) @@ -67,6 +67,13 @@ func makeBatch(t *testing.T, blobSize int, numBlobs int, advThreshold, quorumThr assert.NoError(t, err) blobChunks[i] = chunks + chunkBytes := make([][]byte, len(chunks)) + for _, c := range chunks { + serialized, err := c.Serialize() + assert.NotNil(t, err) + chunkBytes = append(chunkBytes, serialized) + } + // populate blob header blobHeaders[i] = &core.BlobHeader{ BlobCommitments: commits, @@ -75,11 +82,13 @@ func makeBatch(t *testing.T, blobSize int, numBlobs int, advThreshold, quorumThr // populate blob messages for opID, assignment := range quorumInfo.Assignments { - blobMessagesByOp[opID] = append(blobMessagesByOp[opID], &core.BlobMessage{ - BlobHeader: blobHeaders[i], - Bundles: make(core.Bundles), + blobMessagesByOp[opID] = append(blobMessagesByOp[opID], &core.EncodedBlobMessage{ + BlobHeader: blobHeaders[i], + EncodedBundles: make(map[core.QuorumID]*core.ChunksData), }) - blobMessagesByOp[opID][i].Bundles[0] = append(blobMessagesByOp[opID][i].Bundles[0], chunks[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...) + blobMessagesByOp[opID][i].EncodedBundles[0].Format = core.GobChunkEncodingFormat + blobMessagesByOp[opID][i].EncodedBundles[0].ChunkLen = int(params.ChunkLength) + blobMessagesByOp[opID][i].EncodedBundles[0].Chunks = append(blobMessagesByOp[opID][i].EncodedBundles[0].Chunks, chunkBytes[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...) } } @@ -100,7 +109,7 @@ func TestStoreChunks(t *testing.T) { batchHeader, blobMessagesByOp := makeBatch(t, 200*1024, 50, 80, 100, 1) numTotalChunks := 0 for i := range blobMessagesByOp[opID] { - numTotalChunks += len(blobMessagesByOp[opID][i].Bundles[0]) + numTotalChunks += len(blobMessagesByOp[opID][i].EncodedBundles[0].Chunks) } t.Logf("Batch numTotalChunks: %d", numTotalChunks) req, totalSize, err := dispatcher.GetStoreChunksRequest(blobMessagesByOp[opID], batchHeader, false) From 3d7993c36d640279b62c119d2665b42b8e78b2d5 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Tue, 13 Aug 2024 23:41:04 +0000 Subject: [PATCH 03/19] fix --- core/data.go | 2 -- disperser/batcher/encoding_streamer.go | 8 ++++---- disperser/batcher/grpc/dispatcher.go | 2 +- disperser/batcher/minibatcher.go | 2 +- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/core/data.go b/core/data.go index 102099c985..d362bb698d 100644 --- a/core/data.go +++ b/core/data.go @@ -280,8 +280,6 @@ type Bundles map[QuorumID]Bundle type BlobMessage struct { BlobHeader *BlobHeader Bundles Bundles - // EncodedBundles is bundles in encoded format (not deserialized) - EncodedBundles map[QuorumID]*ChunksData } // This is similar to BlobMessage, but keep the commitments and chunks in encoded format diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index 5a0fac572c..680b2d4193 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -482,8 +482,8 @@ func (e *EncodingStreamer) CreateMinibatch(ctx context.Context) (*batch, error) } blobHeaderByKey[blobKey] = blobHeader encodedBlobByKey[blobKey] = core.EncodedBlob{ - BlobHeader: blobHeader, - BundlesByOperator: make(map[core.OperatorID]core.Bundles), + BlobHeader: blobHeader, + EncodedBundlesByOperator: make(map[core.OperatorID]map[core.QuorumID]*core.ChunksData), } } @@ -633,8 +633,8 @@ func (e *EncodingStreamer) CreateBatch(ctx context.Context) (*batch, error) { } blobHeaderByKey[blobKey] = blobHeader encodedBlobByKey[blobKey] = core.EncodedBlob{ - BlobHeader: blobHeader, - BundlesByOperator: make(map[core.OperatorID]core.Bundles), + BlobHeader: blobHeader, + EncodedBundlesByOperator: make(map[core.OperatorID]map[core.QuorumID]*core.ChunksData), } } diff --git a/disperser/batcher/grpc/dispatcher.go b/disperser/batcher/grpc/dispatcher.go index b429e2450c..6597492722 100644 --- a/disperser/batcher/grpc/dispatcher.go +++ b/disperser/batcher/grpc/dispatcher.go @@ -59,7 +59,7 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera return } for _, blob := range blobs { - if _, ok := blob.BundlesByOperator[id]; ok { + if _, ok := blob.EncodedBundlesByOperator[id]; ok { hasAnyBundles = true } blobMessages = append(blobMessages, &core.EncodedBlobMessage{ diff --git a/disperser/batcher/minibatcher.go b/disperser/batcher/minibatcher.go index d828960ac4..9ae7d1517c 100644 --- a/disperser/batcher/minibatcher.go +++ b/disperser/batcher/minibatcher.go @@ -317,7 +317,7 @@ func (b *Minibatcher) SendBlobsToOperatorWithRetries( blobMessages := make([]*core.EncodedBlobMessage, 0) hasAnyBundles := false for _, blob := range blobs { - if _, ok := blob.BundlesByOperator[opID]; ok { + if _, ok := blob.EncodedBundlesByOperator[opID]; ok { hasAnyBundles = true } blobMessages = append(blobMessages, &core.EncodedBlobMessage{ From 6fbf4684ffead2b9bb832c8bb8c7a4120ceaa085 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Wed, 14 Aug 2024 01:05:05 +0000 Subject: [PATCH 04/19] EncodedBundles --- core/data.go | 5 ++++- disperser/batcher/encoding_streamer.go | 8 ++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/core/data.go b/core/data.go index d362bb698d..5a2c03976b 100644 --- a/core/data.go +++ b/core/data.go @@ -267,7 +267,7 @@ type EncodedBlob struct { BlobHeader *BlobHeader BundlesByOperator map[OperatorID]Bundles // EncodedBundlesByOperator is bundles in encoded format (not deserialized) - EncodedBundlesByOperator map[OperatorID]map[QuorumID]*ChunksData + EncodedBundlesByOperator map[OperatorID]EncodedBundles } // A Bundle is the collection of chunks associated with a single blob, for a single operator and a single quorum. @@ -276,6 +276,9 @@ type Bundle []*encoding.Frame // Bundles is the collection of bundles associated with a single blob and a single operator. type Bundles map[QuorumID]Bundle +// This is similar to Bundle, but track chunks in encoded format (i.e. not deserialized). +type EncodedBundles map[QuorumID]*ChunksData + // BlobMessage is the message that is sent to DA nodes. It contains the blob header and the associated chunk bundles. type BlobMessage struct { BlobHeader *BlobHeader diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index 680b2d4193..8961f88f80 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -483,7 +483,7 @@ func (e *EncodingStreamer) CreateMinibatch(ctx context.Context) (*batch, error) blobHeaderByKey[blobKey] = blobHeader encodedBlobByKey[blobKey] = core.EncodedBlob{ BlobHeader: blobHeader, - EncodedBundlesByOperator: make(map[core.OperatorID]map[core.QuorumID]*core.ChunksData), + EncodedBundlesByOperator: make(map[core.OperatorID]core.EncodedBundles), } } @@ -491,7 +491,7 @@ func (e *EncodingStreamer) CreateMinibatch(ctx context.Context) (*batch, error) for opID, assignment := range result.Assignments { bundles, ok := encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] if !ok { - encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] = make(map[core.QuorumID]*core.ChunksData) + encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] = make(core.EncodedBundles) bundles = encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] } bundles[result.BlobQuorumInfo.QuorumID].Format = result.ChunksData.Format @@ -634,7 +634,7 @@ func (e *EncodingStreamer) CreateBatch(ctx context.Context) (*batch, error) { blobHeaderByKey[blobKey] = blobHeader encodedBlobByKey[blobKey] = core.EncodedBlob{ BlobHeader: blobHeader, - EncodedBundlesByOperator: make(map[core.OperatorID]map[core.QuorumID]*core.ChunksData), + EncodedBundlesByOperator: make(map[core.OperatorID]core.EncodedBundles), } } @@ -642,7 +642,7 @@ func (e *EncodingStreamer) CreateBatch(ctx context.Context) (*batch, error) { for opID, assignment := range result.Assignments { bundles, ok := encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] if !ok { - encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] = make(map[core.QuorumID]*core.ChunksData) + encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] = make(core.EncodedBundles) bundles = encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] } bundles[result.BlobQuorumInfo.QuorumID].Format = result.ChunksData.Format From d25dff7782e6f0dffdad9f1df58edc79af48712e Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Wed, 14 Aug 2024 03:05:14 +0000 Subject: [PATCH 05/19] ChunkData deser --- core/data.go | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/core/data.go b/core/data.go index 5a2c03976b..611ba02b1f 100644 --- a/core/data.go +++ b/core/data.go @@ -87,6 +87,31 @@ func (cd *ChunksData) Size() uint64 { return size } +func (cd *ChunksData) Deserialize() ([]*encoding.Frame, error) { + frames := make([]*encoding.Frame, len(cd.Chunks)) + switch cd.Format { + case GobChunkEncodingFormat: + for _, data := range cd.Chunks { + fr, err := new(encoding.Frame).Deserialize(data) + if err != nil { + return nil, err + } + frames = append(frames, fr) + } + case GnarkChunkEncodingFormat: + for _, data := range cd.Chunks { + fr, err := new(encoding.Frame).DeserializeGnark(data) + if err != nil { + return nil, err + } + frames = append(frames, fr) + } + default: + return nil, fmt.Errorf("invalid chunk encoding format: %v", cd.Format) + } + return frames, nil +} + func (cd *ChunksData) FlattenToBundle() ([]byte, error) { // Only Gnark coded chunks are dispersed as a byte array. // Gob coded chunks are not flattened. @@ -276,7 +301,7 @@ type Bundle []*encoding.Frame // Bundles is the collection of bundles associated with a single blob and a single operator. type Bundles map[QuorumID]Bundle -// This is similar to Bundle, but track chunks in encoded format (i.e. not deserialized). +// This is similar to Bundle, but tracks chunks in encoded format (i.e. not deserialized). type EncodedBundles map[QuorumID]*ChunksData // BlobMessage is the message that is sent to DA nodes. It contains the blob header and the associated chunk bundles. From 82e26d6fc7d187c8ab512d3e42f53ec9bfe5a608 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Wed, 14 Aug 2024 03:32:43 +0000 Subject: [PATCH 06/19] EncodedBundles <-> Bundles --- api/clients/mock/node_client.go | 11 ++++++- api/clients/retrieval_client_test.go | 10 +++++-- core/data.go | 44 +++++++++++++++++++++++++++- 3 files changed, 60 insertions(+), 5 deletions(-) diff --git a/api/clients/mock/node_client.go b/api/clients/mock/node_client.go index 42228ae95d..6e13ee61b3 100644 --- a/api/clients/mock/node_client.go +++ b/api/clients/mock/node_client.go @@ -54,9 +54,18 @@ func (c *MockNodeClient) GetChunks( ) { args := c.Called(opID, opInfo, batchHeaderHash, blobIndex) encodedBlob := (args.Get(0)).(core.EncodedBlob) + chunks, err := encodedBlob.EncodedBundlesByOperator[opID][quorumID].ToFrames() + if err != nil { + chunksChan <- clients.RetrievedChunks{ + OperatorID: opID, + Err: err, + Chunks: nil, + } + + } chunksChan <- clients.RetrievedChunks{ OperatorID: opID, Err: nil, - Chunks: encodedBlob.BundlesByOperator[opID][quorumID], + Chunks: chunks, } } diff --git a/api/clients/retrieval_client_test.go b/api/clients/retrieval_client_test.go index bd2e12f752..2eb1b72f40 100644 --- a/api/clients/retrieval_client_test.go +++ b/api/clients/retrieval_client_test.go @@ -60,8 +60,8 @@ var ( retrievalClient clients.RetrievalClient blobHeader *core.BlobHeader encodedBlob core.EncodedBlob = core.EncodedBlob{ - BlobHeader: nil, - BundlesByOperator: make(map[core.OperatorID]core.Bundles), + BlobHeader: nil, + EncodedBundlesByOperator: make(map[core.OperatorID]core.EncodedBundles), } batchHeaderHash [32]byte batchRoot [32]byte @@ -198,7 +198,11 @@ func setup(t *testing.T) { bundles := make(map[core.QuorumID]core.Bundle, len(blobHeader.QuorumInfos)) bundles[quorumID] = chunks[assignment.StartIndex : assignment.StartIndex+assignment.NumChunks] encodedBlob.BlobHeader = blobHeader - encodedBlob.BundlesByOperator[id] = bundles + eb, err := core.Bundles(bundles).ToEncodedBundles() + if err != nil { + t.Fatal(err) + } + encodedBlob.EncodedBundlesByOperator[id] = eb } } diff --git a/core/data.go b/core/data.go index 611ba02b1f..b50ae9ce7e 100644 --- a/core/data.go +++ b/core/data.go @@ -87,7 +87,25 @@ func (cd *ChunksData) Size() uint64 { return size } -func (cd *ChunksData) Deserialize() ([]*encoding.Frame, error) { +func (cd *ChunksData) FromFrames(fr []*encoding.Frame) (*ChunksData, error) { + if len(fr) == 0 { + return nil, errors.New("no frame is provided") + } + var c ChunksData + c.Format = GnarkChunkEncodingFormat + c.ChunkLen = fr[0].Length() + c.Chunks = make([][]byte, len(fr)) + for _, f := range fr { + bytes, err := f.SerializeGnark() + if err != nil { + return nil, err + } + c.Chunks = append(c.Chunks, bytes) + } + return &c, nil +} + +func (cd *ChunksData) ToFrames() ([]*encoding.Frame, error) { frames := make([]*encoding.Frame, len(cd.Chunks)) switch cd.Format { case GobChunkEncodingFormat: @@ -426,3 +444,27 @@ func (cb Bundles) Size() uint64 { } return size } + +func (cb Bundles) ToEncodedBundles() (EncodedBundles, error) { + eb := make(EncodedBundles) + for quorum, bundle := range cb { + cd, err := new(ChunksData).FromFrames(bundle) + if err != nil { + return nil, err + } + eb[quorum] = cd + } + return eb, nil +} + +func (cb Bundles) FromEncodedBundles(eb EncodedBundles) (Bundles, error) { + c := make(Bundles) + for quorum, chunkData := range eb { + fr, err := chunkData.ToFrames() + if err != nil { + return nil, err + } + c[quorum] = fr + } + return c, nil +} From 77c5e9557152cbee257ec4072c2dc630d7720fba Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Wed, 14 Aug 2024 03:57:55 +0000 Subject: [PATCH 07/19] core_test.go --- core/data.go | 4 ++-- core/test/core_test.go | 29 ++++++++++++++++++++++------- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/core/data.go b/core/data.go index b50ae9ce7e..4cdd26b6cf 100644 --- a/core/data.go +++ b/core/data.go @@ -307,8 +307,8 @@ type BatchHeader struct { // EncodedBlob contains the messages to be sent to a group of DA nodes corresponding to a single blob type EncodedBlob struct { - BlobHeader *BlobHeader - BundlesByOperator map[OperatorID]Bundles + BlobHeader *BlobHeader + // BundlesByOperator map[OperatorID]Bundles // EncodedBundlesByOperator is bundles in encoded format (not deserialized) EncodedBundlesByOperator map[OperatorID]EncodedBundles } diff --git a/core/test/core_test.go b/core/test/core_test.go index 04dd74a79e..a443c81302 100644 --- a/core/test/core_test.go +++ b/core/test/core_test.go @@ -113,8 +113,8 @@ func prepareBatch(t *testing.T, operatorCount uint, blobs []core.Blob, bn uint) blobHeaders[z] = blobHeader encodedBlob := core.EncodedBlob{ - BlobHeader: blobHeader, - BundlesByOperator: make(map[core.OperatorID]core.Bundles), + BlobHeader: blobHeader, + EncodedBundlesByOperator: make(map[core.OperatorID]core.EncodedBundles), } encodedBlobs[z] = encodedBlob @@ -156,6 +156,12 @@ func prepareBatch(t *testing.T, operatorCount uint, blobs []core.Blob, bn uint) if err != nil { t.Fatal(err) } + bytes := make([][]byte, len(chunks)) + for _, c := range chunks { + serialized, err := c.SerializeGnark() + t.Fatal(err) + bytes = append(bytes, serialized) + } blobHeader.BlobCommitments = encoding.BlobCommitments{ Commitment: commitments.Commitment, @@ -167,13 +173,18 @@ func prepareBatch(t *testing.T, operatorCount uint, blobs []core.Blob, bn uint) blobHeader.QuorumInfos = append(blobHeader.QuorumInfos, quorumHeader) for id, assignment := range assignments { - _, ok := encodedBlob.BundlesByOperator[id] + chunksData := &core.ChunksData{ + Format: core.GnarkChunkEncodingFormat, + ChunkLen: int(chunkLength), + Chunks: bytes[assignment.StartIndex : assignment.StartIndex+assignment.NumChunks], + } + _, ok := encodedBlob.EncodedBundlesByOperator[id] if !ok { - encodedBlob.BundlesByOperator[id] = map[core.QuorumID]core.Bundle{ - quorumID: chunks[assignment.StartIndex : assignment.StartIndex+assignment.NumChunks], + encodedBlob.EncodedBundlesByOperator[id] = map[core.QuorumID]*core.ChunksData{ + quorumID: chunksData, } } else { - encodedBlob.BundlesByOperator[id][quorumID] = chunks[assignment.StartIndex : assignment.StartIndex+assignment.NumChunks] + encodedBlob.EncodedBundlesByOperator[id][quorumID] = chunksData } } @@ -207,9 +218,13 @@ func checkBatchByUniversalVerifier(cst core.IndexedChainState, encodedBlobs []co val.UpdateOperatorID(id) blobMessages := make([]*core.BlobMessage, numBlob) for z, encodedBlob := range encodedBlobs { + bundles, err := new(core.Bundles).FromEncodedBundles(encodedBlob.EncodedBundlesByOperator[id]) + if err != nil { + return err + } blobMessages[z] = &core.BlobMessage{ BlobHeader: encodedBlob.BlobHeader, - Bundles: encodedBlob.BundlesByOperator[id], + Bundles: bundles, } } err := val.ValidateBatch(&header, blobMessages, state.OperatorState, pool) From bbbee4d100b424090e815226d3677b15f81edfc5 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Wed, 14 Aug 2024 04:04:11 +0000 Subject: [PATCH 08/19] encoding_treamer_test --- disperser/batcher/encoding_streamer_test.go | 32 ++++++++++----------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/disperser/batcher/encoding_streamer_test.go b/disperser/batcher/encoding_streamer_test.go index 688a832f11..cf9e924211 100644 --- a/disperser/batcher/encoding_streamer_test.go +++ b/disperser/batcher/encoding_streamer_test.go @@ -445,7 +445,7 @@ func TestPartialBlob(t *testing.T) { // Check EncodedBlobs assert.Len(t, batch.EncodedBlobs, 1) - assert.Len(t, batch.EncodedBlobs[0].BundlesByOperator, numOperators) + assert.Len(t, batch.EncodedBlobs[0].EncodedBundlesByOperator, numOperators) encodedBlob1 := batch.EncodedBlobs[0] assert.NotNil(t, encodedBlob1) @@ -465,10 +465,10 @@ func TestPartialBlob(t *testing.T) { }}) assert.Contains(t, batch.BlobHeaders, encodedBlob1.BlobHeader) - assert.Len(t, encodedBlob1.BundlesByOperator, numOperators) - for _, bundles := range encodedBlob1.BundlesByOperator { + assert.Len(t, encodedBlob1.EncodedBundlesByOperator, numOperators) + for _, bundles := range encodedBlob1.EncodedBundlesByOperator { assert.Len(t, bundles, 1) - assert.Greater(t, len(bundles[0]), 0) + assert.Greater(t, len(bundles[0].Chunks), 0) break } @@ -674,7 +674,7 @@ func TestGetBatch(t *testing.T) { // Check EncodedBlobs assert.Len(t, batch.EncodedBlobs, 2) - assert.Len(t, batch.EncodedBlobs[0].BundlesByOperator, numOperators) + assert.Len(t, batch.EncodedBlobs[0].EncodedBundlesByOperator, numOperators) var encodedBlob1 core.EncodedBlob var encodedBlob2 core.EncodedBlob @@ -718,10 +718,10 @@ func TestGetBatch(t *testing.T) { }) assert.Contains(t, batch.BlobHeaders, encodedBlob1.BlobHeader) - for _, bundles := range encodedBlob1.BundlesByOperator { + for _, bundles := range encodedBlob1.EncodedBundlesByOperator { assert.Len(t, bundles, 2) - assert.Greater(t, len(bundles[0]), 0) - assert.Greater(t, len(bundles[1]), 0) + assert.Greater(t, len(bundles[0].Chunks), 0) + assert.Greater(t, len(bundles[1].Chunks), 0) break } @@ -739,9 +739,9 @@ func TestGetBatch(t *testing.T) { }, ChunkLength: 8, }}) - for _, bundles := range encodedBlob2.BundlesByOperator { + for _, bundles := range encodedBlob2.EncodedBundlesByOperator { assert.Len(t, bundles, 1) - assert.Greater(t, len(bundles[core.QuorumID(2)]), 0) + assert.Greater(t, len(bundles[core.QuorumID(2)].Chunks), 0) break } assert.Len(t, batch.BlobHeaders, 2) @@ -842,7 +842,7 @@ func TestCreateMinibatch(t *testing.T) { // Check EncodedBlobs assert.Len(t, batch.EncodedBlobs, 2) - assert.Len(t, batch.EncodedBlobs[0].BundlesByOperator, numOperators) + assert.Len(t, batch.EncodedBlobs[0].EncodedBundlesByOperator, numOperators) var encodedBlob1 core.EncodedBlob var encodedBlob2 core.EncodedBlob @@ -886,10 +886,10 @@ func TestCreateMinibatch(t *testing.T) { }) assert.Contains(t, batch.BlobHeaders, encodedBlob1.BlobHeader) - for _, bundles := range encodedBlob1.BundlesByOperator { + for _, bundles := range encodedBlob1.EncodedBundlesByOperator { assert.Len(t, bundles, 2) - assert.Greater(t, len(bundles[0]), 0) - assert.Greater(t, len(bundles[1]), 0) + assert.Greater(t, len(bundles[0].Chunks), 0) + assert.Greater(t, len(bundles[1].Chunks), 0) break } @@ -907,9 +907,9 @@ func TestCreateMinibatch(t *testing.T) { }, ChunkLength: 8, }}) - for _, bundles := range encodedBlob2.BundlesByOperator { + for _, bundles := range encodedBlob2.EncodedBundlesByOperator { assert.Len(t, bundles, 1) - assert.Greater(t, len(bundles[core.QuorumID(2)]), 0) + assert.Greater(t, len(bundles[core.QuorumID(2)].Chunks), 0) break } assert.Len(t, batch.BlobHeaders, 2) From 863c08596c98b43f706066a914d455a960cd3bf1 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Wed, 14 Aug 2024 04:21:40 +0000 Subject: [PATCH 09/19] fix --- disperser/batcher/encoding_streamer.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index 8961f88f80..dc9bd8b9d2 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -380,6 +380,8 @@ func (e *EncodingStreamer) RequestEncodingForBlob(ctx context.Context, metadata return } + fmt.Println("XXX chunks format:", chunks.Format) + encoderChan <- EncodingResultOrStatus{ EncodingResult: EncodingResult{ BlobMetadata: metadata, @@ -494,6 +496,7 @@ func (e *EncodingStreamer) CreateMinibatch(ctx context.Context) (*batch, error) encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] = make(core.EncodedBundles) bundles = encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] } + bundles[result.BlobQuorumInfo.QuorumID] = new(core.ChunksData) bundles[result.BlobQuorumInfo.QuorumID].Format = result.ChunksData.Format bundles[result.BlobQuorumInfo.QuorumID].Chunks = append(bundles[result.BlobQuorumInfo.QuorumID].Chunks, result.ChunksData.Chunks[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...) bundles[result.BlobQuorumInfo.QuorumID].ChunkLen = result.ChunksData.ChunkLen @@ -645,6 +648,7 @@ func (e *EncodingStreamer) CreateBatch(ctx context.Context) (*batch, error) { encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] = make(core.EncodedBundles) bundles = encodedBlobByKey[blobKey].EncodedBundlesByOperator[opID] } + bundles[result.BlobQuorumInfo.QuorumID] = new(core.ChunksData) bundles[result.BlobQuorumInfo.QuorumID].Format = result.ChunksData.Format bundles[result.BlobQuorumInfo.QuorumID].Chunks = append(bundles[result.BlobQuorumInfo.QuorumID].Chunks, result.ChunksData.Chunks[assignment.StartIndex:assignment.StartIndex+assignment.NumChunks]...) bundles[result.BlobQuorumInfo.QuorumID].ChunkLen = result.ChunksData.ChunkLen From 9d6a6df1d702b0fab5833cf3d4f84ef2148147c7 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Wed, 14 Aug 2024 17:42:33 +0000 Subject: [PATCH 10/19] fix --- disperser/batcher/encoded_blob_store.go | 3 +++ disperser/batcher/encoding_streamer.go | 2 -- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/disperser/batcher/encoded_blob_store.go b/disperser/batcher/encoded_blob_store.go index fcf8baa21d..0e171697f3 100644 --- a/disperser/batcher/encoded_blob_store.go +++ b/disperser/batcher/encoded_blob_store.go @@ -197,5 +197,8 @@ func getRequestID(key disperser.BlobKey, quorumID core.QuorumID) requestID { // getChunksSize returns the total size of all the chunks in the encoded result in bytes func getChunksSize(result *EncodingResult) uint64 { + if result.ChunksData == nil { + return 0 + } return result.ChunksData.Size() } diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index dc9bd8b9d2..99173f6f83 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -380,8 +380,6 @@ func (e *EncodingStreamer) RequestEncodingForBlob(ctx context.Context, metadata return } - fmt.Println("XXX chunks format:", chunks.Format) - encoderChan <- EncodingResultOrStatus{ EncodingResult: EncodingResult{ BlobMetadata: metadata, From d803542a084712a7ff3cc44c9ce137acdd9e369e Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Wed, 14 Aug 2024 17:43:32 +0000 Subject: [PATCH 11/19] fix --- core/test/core_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/test/core_test.go b/core/test/core_test.go index a443c81302..ae7c94881b 100644 --- a/core/test/core_test.go +++ b/core/test/core_test.go @@ -159,7 +159,9 @@ func prepareBatch(t *testing.T, operatorCount uint, blobs []core.Blob, bn uint) bytes := make([][]byte, len(chunks)) for _, c := range chunks { serialized, err := c.SerializeGnark() - t.Fatal(err) + if err != nil { + t.Fatal(err) + } bytes = append(bytes, serialized) } From e936c0d48d06271584ee114bbc64c38f571edbe1 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Wed, 14 Aug 2024 18:37:44 +0000 Subject: [PATCH 12/19] fix --- disperser/batcher/encoding_streamer_test.go | 8 ++++---- disperser/local_encoder_client.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/disperser/batcher/encoding_streamer_test.go b/disperser/batcher/encoding_streamer_test.go index cf9e924211..e2b446f932 100644 --- a/disperser/batcher/encoding_streamer_test.go +++ b/disperser/batcher/encoding_streamer_test.go @@ -246,12 +246,12 @@ func TestStreamingEncoding(t *testing.T) { assert.NotNil(t, encodedResult.Commitment.LengthProof) assert.Greater(t, encodedResult.Commitment.Length, uint(0)) assert.Len(t, encodedResult.Assignments, numOperators) - assert.Len(t, encodedResult.ChunksData, 32) + assert.Len(t, encodedResult.ChunksData.Chunks, 32) isRequested = encodingStreamer.EncodedBlobstore.HasEncodingRequested(metadataKey, core.QuorumID(0), 10) assert.True(t, isRequested) count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, count, 1) - assert.Equal(t, size, uint64(16384)) + assert.Equal(t, size, uint64(26630)) // Cancel previous blob so it doesn't get reencoded. err = c.blobStore.MarkBlobFailed(ctx, metadataKey) @@ -281,7 +281,7 @@ func TestStreamingEncoding(t *testing.T) { assert.True(t, isRequested) count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, count, 1) - assert.Equal(t, size, uint64(16384)) + assert.Equal(t, size, uint64(26630)) // Request the same blob, which should be dedupped _, err = c.blobStore.StoreBlob(ctx, &blob, requestedAt) @@ -292,7 +292,7 @@ func TestStreamingEncoding(t *testing.T) { // It should not have been added to the encoded blob store count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, count, 1) - assert.Equal(t, size, uint64(16384)) + assert.Equal(t, size, uint64(26630)) } func TestEncodingFailure(t *testing.T) { diff --git a/disperser/local_encoder_client.go b/disperser/local_encoder_client.go index 7fabf27aa6..ed55efda0b 100644 --- a/disperser/local_encoder_client.go +++ b/disperser/local_encoder_client.go @@ -30,7 +30,7 @@ func (m *LocalEncoderClient) EncodeBlob(ctx context.Context, data []byte, encodi return nil, nil, err } - bytes := make([][]byte, len(chunks)) + bytes := make([][]byte, 0, len(chunks)) for _, c := range chunks { serialized, err := c.Serialize() if err != nil { From d12e44c42d682fe0d5e97c60e4021996de030c11 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Wed, 14 Aug 2024 19:56:11 +0000 Subject: [PATCH 13/19] fix --- disperser/batcher/batcher_test.go | 2 +- disperser/batcher/encoding_streamer_test.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/disperser/batcher/batcher_test.go b/disperser/batcher/batcher_test.go index 29786e9205..2d3a27f1a8 100644 --- a/disperser/batcher/batcher_test.go +++ b/disperser/batcher/batcher_test.go @@ -227,7 +227,7 @@ func TestBatcherIterations(t *testing.T) { assert.NoError(t, err) count, size := components.encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, 2, count) - assert.Equal(t, uint64(24576), size) // Robert checks it + assert.Equal(t, uint64(27631), size) // Robert checks it txn := types.NewTransaction(0, gethcommon.Address{}, big.NewInt(0), 0, big.NewInt(0), nil) components.transactor.On("BuildConfirmBatchTxn", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { diff --git a/disperser/batcher/encoding_streamer_test.go b/disperser/batcher/encoding_streamer_test.go index e2b446f932..5c54dfc880 100644 --- a/disperser/batcher/encoding_streamer_test.go +++ b/disperser/batcher/encoding_streamer_test.go @@ -142,7 +142,7 @@ func TestEncodingQueueLimit(t *testing.T) { } func TestBatchTrigger(t *testing.T) { - encodingStreamer, c := createEncodingStreamer(t, 10, 20_000, streamerConfig) + encodingStreamer, c := createEncodingStreamer(t, 10, 30_000, streamerConfig) blob := makeTestBlob([]*core.SecurityParam{{ QuorumID: 0, @@ -160,7 +160,7 @@ func TestBatchTrigger(t *testing.T) { assert.Nil(t, err) count, size := encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, count, 1) - assert.Equal(t, size, uint64(16384)) + assert.Equal(t, size, uint64(26630)) // try encode the same blobs again at different block (this happens when the blob is retried) encodingStreamer.ReferenceBlockNumber = 11 @@ -171,7 +171,7 @@ func TestBatchTrigger(t *testing.T) { count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, count, 1) - assert.Equal(t, size, uint64(16384)) + assert.Equal(t, size, uint64(26630)) // don't notify yet select { @@ -190,7 +190,7 @@ func TestBatchTrigger(t *testing.T) { count, size = encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, count, 2) - assert.Equal(t, size, uint64(16384)*2) + assert.Equal(t, size, uint64(26630)*2) // notify select { From 1954c1b8271f6128245846274823511e3c99e9f7 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Wed, 14 Aug 2024 21:30:34 +0000 Subject: [PATCH 14/19] fix --- core/data.go | 2 +- core/test/core_test.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/data.go b/core/data.go index 4cdd26b6cf..064f5052d2 100644 --- a/core/data.go +++ b/core/data.go @@ -106,7 +106,7 @@ func (cd *ChunksData) FromFrames(fr []*encoding.Frame) (*ChunksData, error) { } func (cd *ChunksData) ToFrames() ([]*encoding.Frame, error) { - frames := make([]*encoding.Frame, len(cd.Chunks)) + frames := make([]*encoding.Frame, 0, len(cd.Chunks)) switch cd.Format { case GobChunkEncodingFormat: for _, data := range cd.Chunks { diff --git a/core/test/core_test.go b/core/test/core_test.go index ae7c94881b..9ee1ec6b10 100644 --- a/core/test/core_test.go +++ b/core/test/core_test.go @@ -156,9 +156,9 @@ func prepareBatch(t *testing.T, operatorCount uint, blobs []core.Blob, bn uint) if err != nil { t.Fatal(err) } - bytes := make([][]byte, len(chunks)) + bytes := make([][]byte, 0, len(chunks)) for _, c := range chunks { - serialized, err := c.SerializeGnark() + serialized, err := c.Serialize() if err != nil { t.Fatal(err) } @@ -176,7 +176,7 @@ func prepareBatch(t *testing.T, operatorCount uint, blobs []core.Blob, bn uint) for id, assignment := range assignments { chunksData := &core.ChunksData{ - Format: core.GnarkChunkEncodingFormat, + Format: core.GobChunkEncodingFormat, ChunkLen: int(chunkLength), Chunks: bytes[assignment.StartIndex : assignment.StartIndex+assignment.NumChunks], } From 6f927ba2b9b7cfa74682d0c97d9bb806e3449194 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Wed, 14 Aug 2024 21:57:48 +0000 Subject: [PATCH 15/19] add test --- core/data.go | 2 +- core/data_test.go | 92 +++++++++++++++++++++++++++++------------------ 2 files changed, 59 insertions(+), 35 deletions(-) diff --git a/core/data.go b/core/data.go index 064f5052d2..b2280c243d 100644 --- a/core/data.go +++ b/core/data.go @@ -94,7 +94,7 @@ func (cd *ChunksData) FromFrames(fr []*encoding.Frame) (*ChunksData, error) { var c ChunksData c.Format = GnarkChunkEncodingFormat c.ChunkLen = fr[0].Length() - c.Chunks = make([][]byte, len(fr)) + c.Chunks = make([][]byte, 0, len(fr)) for _, f := range fr { bytes, err := f.SerializeGnark() if err != nil { diff --git a/core/data_test.go b/core/data_test.go index c062c72442..8d77f7ba83 100644 --- a/core/data_test.go +++ b/core/data_test.go @@ -33,6 +33,52 @@ func createBundle(t *testing.T, numFrames, numCoeffs, seed int) core.Bundle { return frames } +func createChunksData(t *testing.T, seed int) (core.Bundle, *core.ChunksData, *core.ChunksData) { + bundle := createBundle(t, 64, 64, seed) + gobChunks := make([][]byte, len(bundle)) + gnarkChunks := make([][]byte, len(bundle)) + for i, frame := range bundle { + gobChunk, err := frame.Serialize() + assert.Nil(t, err) + gobChunks[i] = gobChunk + + gnarkChunk, err := frame.SerializeGnark() + assert.Nil(t, err) + gnarkChunks[i] = gnarkChunk + } + gob := &core.ChunksData{ + Chunks: gobChunks, + Format: core.GobChunkEncodingFormat, + ChunkLen: 64, + } + gnark := &core.ChunksData{ + Chunks: gnarkChunks, + Format: core.GnarkChunkEncodingFormat, + ChunkLen: 64, + } + return bundle, gob, gnark +} + +func checkChunksDataEquivalence(t *testing.T, cd1, cd2 *core.ChunksData) { + assert.Equal(t, cd1.Format, cd2.Format) + assert.Equal(t, cd1.ChunkLen, cd2.ChunkLen) + assert.Equal(t, len(cd1.Chunks), len(cd2.Chunks)) + for i, c1 := range cd1.Chunks { + assert.True(t, bytes.Equal(c1, cd2.Chunks[i])) + } +} + +func checkBundleEquivalence(t *testing.T, b1, b2 core.Bundle) { + assert.Equal(t, len(b1), len(b2)) + for i := 0; i < len(b1); i++ { + assert.True(t, b1[i].Proof.Equal(&b2[i].Proof)) + assert.Equal(t, len(b1[i].Coeffs), len(b2[i].Coeffs)) + for j := 0; j < len(b1[i].Coeffs); j++ { + assert.True(t, b1[i].Coeffs[j].Equal(&b2[i].Coeffs[j])) + } + } +} + func TestInvalidBundleSer(t *testing.T) { b1 := createBundle(t, 1, 0, 0) _, err := b1.Serialize() @@ -86,43 +132,10 @@ func TestBundleEncoding(t *testing.T) { assert.Nil(t, err) decoded, err := new(core.Bundle).Deserialize(bytes) assert.Nil(t, err) - assert.Equal(t, len(bundle), len(decoded)) - for i := 0; i < len(bundle); i++ { - assert.True(t, bundle[i].Proof.Equal(&decoded[i].Proof)) - assert.Equal(t, len(bundle[i].Coeffs), len(decoded[i].Coeffs)) - for j := 0; j < len(bundle[i].Coeffs); j++ { - assert.True(t, bundle[i].Coeffs[j].Equal(&decoded[i].Coeffs[j])) - } - } + checkBundleEquivalence(t, bundle, decoded) } } -func createChunksData(t *testing.T, seed int) (core.Bundle, *core.ChunksData, *core.ChunksData) { - bundle := createBundle(t, 64, 64, seed) - gobChunks := make([][]byte, len(bundle)) - gnarkChunks := make([][]byte, len(bundle)) - for i, frame := range bundle { - gobChunk, err := frame.Serialize() - assert.Nil(t, err) - gobChunks[i] = gobChunk - - gnarkChunk, err := frame.SerializeGnark() - assert.Nil(t, err) - gnarkChunks[i] = gnarkChunk - } - gob := &core.ChunksData{ - Chunks: gobChunks, - Format: core.GobChunkEncodingFormat, - ChunkLen: 64, - } - gnark := &core.ChunksData{ - Chunks: gnarkChunks, - Format: core.GnarkChunkEncodingFormat, - ChunkLen: 64, - } - return bundle, gob, gnark -} - func TestChunksData(t *testing.T) { numTrials := 16 for i := 0; i < numTrials; i++ { @@ -156,6 +169,17 @@ func TestChunksData(t *testing.T) { bytesFromBundle, err := bundle.Serialize() assert.Nil(t, err) assert.True(t, bytes.Equal(bytesFromChunksData, bytesFromBundle)) + // FromFrames + cd, err := new(core.ChunksData).FromFrames(bundle) + assert.Nil(t, err) + checkChunksDataEquivalence(t, cd, gnark) + // ToFrames + fr1, err := gob.ToFrames() + assert.Nil(t, err) + checkBundleEquivalence(t, bundle, fr1) + fr2, err := gnark.ToFrames() + assert.Nil(t, err) + checkBundleEquivalence(t, bundle, fr2) // Invalid cases gnark.Chunks[0] = gnark.Chunks[0][1:] _, err = gnark.FlattenToBundle() From eda8d9ad601520df37b6ab51f9cc90a829a369e3 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Wed, 14 Aug 2024 22:24:41 +0000 Subject: [PATCH 16/19] add more test --- core/data.go | 4 ++-- core/data_test.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/core/data.go b/core/data.go index b2280c243d..90302b964f 100644 --- a/core/data.go +++ b/core/data.go @@ -307,8 +307,8 @@ type BatchHeader struct { // EncodedBlob contains the messages to be sent to a group of DA nodes corresponding to a single blob type EncodedBlob struct { - BlobHeader *BlobHeader - // BundlesByOperator map[OperatorID]Bundles + BlobHeader *BlobHeader + BundlesByOperator map[OperatorID]Bundles // EncodedBundlesByOperator is bundles in encoded format (not deserialized) EncodedBundlesByOperator map[OperatorID]EncodedBundles } diff --git a/core/data_test.go b/core/data_test.go index 8d77f7ba83..f602b73e72 100644 --- a/core/data_test.go +++ b/core/data_test.go @@ -136,6 +136,36 @@ func TestBundleEncoding(t *testing.T) { } } +func TestEncodedBundles(t *testing.T) { + numTrials := 16 + for i := 0; i < numTrials; i++ { + bundles := core.Bundles(map[core.QuorumID]core.Bundle{ + 0: createBundle(t, 64, 64, i), + 1: createBundle(t, 64, 64, i+numTrials), + }) + // ToEncodedBundles + ec, err := bundles.ToEncodedBundles() + assert.Nil(t, err) + assert.Equal(t, len(ec), len(bundles)) + for quorum, bundle := range bundles { + cd, ok := ec[quorum] + assert.True(t, ok) + fr, err := cd.ToFrames() + assert.Nil(t, err) + checkBundleEquivalence(t, fr, bundle) + } + // FromEncodedBundles + bundles2, err := new(core.Bundles).FromEncodedBundles(ec) + assert.Nil(t, err) + assert.Equal(t, len(bundles2), len(bundles)) + for quorum, bundle := range bundles { + b, ok := bundles2[quorum] + assert.True(t, ok) + checkBundleEquivalence(t, b, bundle) + } + } +} + func TestChunksData(t *testing.T) { numTrials := 16 for i := 0; i < numTrials; i++ { From 2e1ccb15e06cea213904bd8ca09f12212909c9e4 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Wed, 14 Aug 2024 22:33:04 +0000 Subject: [PATCH 17/19] fix --- disperser/batcher/encoded_blob_store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/disperser/batcher/encoded_blob_store.go b/disperser/batcher/encoded_blob_store.go index 0e171697f3..6deccee54c 100644 --- a/disperser/batcher/encoded_blob_store.go +++ b/disperser/batcher/encoded_blob_store.go @@ -197,7 +197,7 @@ func getRequestID(key disperser.BlobKey, quorumID core.QuorumID) requestID { // getChunksSize returns the total size of all the chunks in the encoded result in bytes func getChunksSize(result *EncodingResult) uint64 { - if result.ChunksData == nil { + if result == nil || result.ChunksData == nil { return 0 } return result.ChunksData.Size() From 290b37e65768bebfe10a9fb7951ab25f1fc0a658 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Thu, 15 Aug 2024 02:14:00 +0000 Subject: [PATCH 18/19] fix --- core/data.go | 10 ++++++---- core/data_test.go | 10 ++-------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/core/data.go b/core/data.go index 90302b964f..45f9e750f8 100644 --- a/core/data.go +++ b/core/data.go @@ -171,8 +171,9 @@ func (cd *ChunksData) ToGobFormat() (*ChunksData, error) { gobChunks = append(gobChunks, gob) } return &ChunksData{ - Chunks: gobChunks, - Format: GobChunkEncodingFormat, + Chunks: gobChunks, + Format: GobChunkEncodingFormat, + ChunkLen: cd.ChunkLen, }, nil } @@ -196,8 +197,9 @@ func (cd *ChunksData) ToGnarkFormat() (*ChunksData, error) { gnarkChunks = append(gnarkChunks, gnark) } return &ChunksData{ - Chunks: gnarkChunks, - Format: GnarkChunkEncodingFormat, + Chunks: gnarkChunks, + Format: GnarkChunkEncodingFormat, + ChunkLen: cd.ChunkLen, }, nil } diff --git a/core/data_test.go b/core/data_test.go index f602b73e72..84cb5097e9 100644 --- a/core/data_test.go +++ b/core/data_test.go @@ -179,20 +179,14 @@ func TestChunksData(t *testing.T) { assert.Equal(t, convertedGob, gob) convertedGob, err = gnark.ToGobFormat() assert.Nil(t, err) - assert.Equal(t, len(gob.Chunks), len(convertedGob.Chunks)) - for i := 0; i < len(gob.Chunks); i++ { - assert.True(t, bytes.Equal(gob.Chunks[i], convertedGob.Chunks[i])) - } + checkChunksDataEquivalence(t, gob, convertedGob) // ToGnarkFormat convertedGnark, err := gnark.ToGnarkFormat() assert.Nil(t, err) assert.Equal(t, convertedGnark, gnark) convertedGnark, err = gob.ToGnarkFormat() assert.Nil(t, err) - assert.Equal(t, len(gnark.Chunks), len(convertedGnark.Chunks)) - for i := 0; i < len(gnark.Chunks); i++ { - assert.True(t, bytes.Equal(gnark.Chunks[i], convertedGnark.Chunks[i])) - } + checkChunksDataEquivalence(t, gnark, convertedGnark) // FlattenToBundle bytesFromChunksData, err := gnark.FlattenToBundle() assert.Nil(t, err) From 92e576cc0c47bb6ea84a2065d52b4e460e3df4cf Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Mon, 19 Aug 2024 18:02:59 +0000 Subject: [PATCH 19/19] feedback --- disperser/batcher/batcher_test.go | 2 +- node/grpc/server_load_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/disperser/batcher/batcher_test.go b/disperser/batcher/batcher_test.go index 2d3a27f1a8..0a2e8d91a7 100644 --- a/disperser/batcher/batcher_test.go +++ b/disperser/batcher/batcher_test.go @@ -227,7 +227,7 @@ func TestBatcherIterations(t *testing.T) { assert.NoError(t, err) count, size := components.encodingStreamer.EncodedBlobstore.GetEncodedResultSize() assert.Equal(t, 2, count) - assert.Equal(t, uint64(27631), size) // Robert checks it + assert.Equal(t, uint64(27631), size) txn := types.NewTransaction(0, gethcommon.Address{}, big.NewInt(0), 0, big.NewInt(0), nil) components.transactor.On("BuildConfirmBatchTxn", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { diff --git a/node/grpc/server_load_test.go b/node/grpc/server_load_test.go index c216c0269b..319da4a174 100644 --- a/node/grpc/server_load_test.go +++ b/node/grpc/server_load_test.go @@ -84,7 +84,7 @@ func makeBatch(t *testing.T, blobSize int, numBlobs int, advThreshold, quorumThr for opID, assignment := range quorumInfo.Assignments { blobMessagesByOp[opID] = append(blobMessagesByOp[opID], &core.EncodedBlobMessage{ BlobHeader: blobHeaders[i], - EncodedBundles: make(map[core.QuorumID]*core.ChunksData), + EncodedBundles: make(core.EncodedBundles), }) blobMessagesByOp[opID][i].EncodedBundles[0].Format = core.GobChunkEncodingFormat blobMessagesByOp[opID][i].EncodedBundles[0].ChunkLen = int(params.ChunkLength)