Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2/N][zero serialization] Make Batcher operate on chunks without ser/deser #700

Merged
merged 19 commits into from
Aug 19, 2024
11 changes: 10 additions & 1 deletion api/clients/mock/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,18 @@ func (c *MockNodeClient) GetChunks(
) {
args := c.Called(opID, opInfo, batchHeaderHash, blobIndex)
encodedBlob := (args.Get(0)).(core.EncodedBlob)
chunks, err := encodedBlob.EncodedBundlesByOperator[opID][quorumID].ToFrames()
if err != nil {
chunksChan <- clients.RetrievedChunks{
OperatorID: opID,
Err: err,
Chunks: nil,
}

}
chunksChan <- clients.RetrievedChunks{
OperatorID: opID,
Err: nil,
Chunks: encodedBlob.BundlesByOperator[opID][quorumID],
Chunks: chunks,
}
}
10 changes: 7 additions & 3 deletions api/clients/retrieval_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ var (
retrievalClient clients.RetrievalClient
blobHeader *core.BlobHeader
encodedBlob core.EncodedBlob = core.EncodedBlob{
BlobHeader: nil,
BundlesByOperator: make(map[core.OperatorID]core.Bundles),
BlobHeader: nil,
EncodedBundlesByOperator: make(map[core.OperatorID]core.EncodedBundles),
}
batchHeaderHash [32]byte
batchRoot [32]byte
Expand Down Expand Up @@ -198,7 +198,11 @@ func setup(t *testing.T) {
bundles := make(map[core.QuorumID]core.Bundle, len(blobHeader.QuorumInfos))
bundles[quorumID] = chunks[assignment.StartIndex : assignment.StartIndex+assignment.NumChunks]
encodedBlob.BlobHeader = blobHeader
encodedBlob.BundlesByOperator[id] = bundles
eb, err := core.Bundles(bundles).ToEncodedBundles()
if err != nil {
t.Fatal(err)
}
encodedBlob.EncodedBundlesByOperator[id] = eb
}

}
Expand Down
80 changes: 80 additions & 0 deletions core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,49 @@ func (cd *ChunksData) Size() uint64 {
return size
}

func (cd *ChunksData) FromFrames(fr []*encoding.Frame) (*ChunksData, error) {
if len(fr) == 0 {
return nil, errors.New("no frame is provided")
}
var c ChunksData
c.Format = GnarkChunkEncodingFormat
c.ChunkLen = fr[0].Length()
c.Chunks = make([][]byte, 0, len(fr))
for _, f := range fr {
bytes, err := f.SerializeGnark()
if err != nil {
return nil, err
}
c.Chunks = append(c.Chunks, bytes)
}
return &c, nil
}

func (cd *ChunksData) ToFrames() ([]*encoding.Frame, error) {
frames := make([]*encoding.Frame, 0, len(cd.Chunks))
switch cd.Format {
case GobChunkEncodingFormat:
for _, data := range cd.Chunks {
fr, err := new(encoding.Frame).Deserialize(data)
if err != nil {
return nil, err
}
frames = append(frames, fr)
}
case GnarkChunkEncodingFormat:
for _, data := range cd.Chunks {
fr, err := new(encoding.Frame).DeserializeGnark(data)
if err != nil {
return nil, err
}
frames = append(frames, fr)
}
default:
return nil, fmt.Errorf("invalid chunk encoding format: %v", cd.Format)
}
return frames, nil
}

func (cd *ChunksData) FlattenToBundle() ([]byte, error) {
// Only Gnark coded chunks are dispersed as a byte array.
// Gob coded chunks are not flattened.
Expand Down Expand Up @@ -266,6 +309,8 @@ type BatchHeader struct {
type EncodedBlob struct {
BlobHeader *BlobHeader
BundlesByOperator map[OperatorID]Bundles
// EncodedBundlesByOperator is bundles in encoded format (not deserialized)
EncodedBundlesByOperator map[OperatorID]EncodedBundles
}

// A Bundle is the collection of chunks associated with a single blob, for a single operator and a single quorum.
Expand All @@ -274,12 +319,23 @@ type Bundle []*encoding.Frame
// Bundles is the collection of bundles associated with a single blob and a single operator.
type Bundles map[QuorumID]Bundle

// This is similar to Bundle, but tracks chunks in encoded format (i.e. not deserialized).
type EncodedBundles map[QuorumID]*ChunksData

// BlobMessage is the message that is sent to DA nodes. It contains the blob header and the associated chunk bundles.
type BlobMessage struct {
BlobHeader *BlobHeader
Bundles Bundles
}

// This is similar to BlobMessage, but keep the commitments and chunks in encoded format
// (i.e. not deserialized)
type EncodedBlobMessage struct {
// TODO(jianoaix): Change the commitments to encoded format.
BlobHeader *BlobHeader
EncodedBundles map[QuorumID]*ChunksData
}

func (b Bundle) Size() uint64 {
size := uint64(0)
for _, chunk := range b {
Expand Down Expand Up @@ -388,3 +444,27 @@ func (cb Bundles) Size() uint64 {
}
return size
}

func (cb Bundles) ToEncodedBundles() (EncodedBundles, error) {
eb := make(EncodedBundles)
for quorum, bundle := range cb {
cd, err := new(ChunksData).FromFrames(bundle)
if err != nil {
return nil, err
}
eb[quorum] = cd
}
return eb, nil
}

func (cb Bundles) FromEncodedBundles(eb EncodedBundles) (Bundles, error) {
c := make(Bundles)
for quorum, chunkData := range eb {
fr, err := chunkData.ToFrames()
if err != nil {
return nil, err
}
c[quorum] = fr
}
return c, nil
}
112 changes: 83 additions & 29 deletions core/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,52 @@ func createBundle(t *testing.T, numFrames, numCoeffs, seed int) core.Bundle {
return frames
}

func createChunksData(t *testing.T, seed int) (core.Bundle, *core.ChunksData, *core.ChunksData) {
bundle := createBundle(t, 64, 64, seed)
gobChunks := make([][]byte, len(bundle))
gnarkChunks := make([][]byte, len(bundle))
for i, frame := range bundle {
gobChunk, err := frame.Serialize()
assert.Nil(t, err)
gobChunks[i] = gobChunk

gnarkChunk, err := frame.SerializeGnark()
assert.Nil(t, err)
gnarkChunks[i] = gnarkChunk
}
gob := &core.ChunksData{
Chunks: gobChunks,
Format: core.GobChunkEncodingFormat,
ChunkLen: 64,
}
gnark := &core.ChunksData{
Chunks: gnarkChunks,
Format: core.GnarkChunkEncodingFormat,
ChunkLen: 64,
}
return bundle, gob, gnark
}

func checkChunksDataEquivalence(t *testing.T, cd1, cd2 *core.ChunksData) {
assert.Equal(t, cd1.Format, cd2.Format)
assert.Equal(t, cd1.ChunkLen, cd2.ChunkLen)
assert.Equal(t, len(cd1.Chunks), len(cd2.Chunks))
for i, c1 := range cd1.Chunks {
assert.True(t, bytes.Equal(c1, cd2.Chunks[i]))
}
}

func checkBundleEquivalence(t *testing.T, b1, b2 core.Bundle) {
assert.Equal(t, len(b1), len(b2))
for i := 0; i < len(b1); i++ {
assert.True(t, b1[i].Proof.Equal(&b2[i].Proof))
assert.Equal(t, len(b1[i].Coeffs), len(b2[i].Coeffs))
for j := 0; j < len(b1[i].Coeffs); j++ {
assert.True(t, b1[i].Coeffs[j].Equal(&b2[i].Coeffs[j]))
}
}
}

func TestInvalidBundleSer(t *testing.T) {
b1 := createBundle(t, 1, 0, 0)
_, err := b1.Serialize()
Expand Down Expand Up @@ -86,41 +132,38 @@ func TestBundleEncoding(t *testing.T) {
assert.Nil(t, err)
decoded, err := new(core.Bundle).Deserialize(bytes)
assert.Nil(t, err)
assert.Equal(t, len(bundle), len(decoded))
for i := 0; i < len(bundle); i++ {
assert.True(t, bundle[i].Proof.Equal(&decoded[i].Proof))
assert.Equal(t, len(bundle[i].Coeffs), len(decoded[i].Coeffs))
for j := 0; j < len(bundle[i].Coeffs); j++ {
assert.True(t, bundle[i].Coeffs[j].Equal(&decoded[i].Coeffs[j]))
}
}
checkBundleEquivalence(t, bundle, decoded)
}
}

func createChunksData(t *testing.T, seed int) (core.Bundle, *core.ChunksData, *core.ChunksData) {
bundle := createBundle(t, 64, 64, seed)
gobChunks := make([][]byte, len(bundle))
gnarkChunks := make([][]byte, len(bundle))
for i, frame := range bundle {
gobChunk, err := frame.Serialize()
func TestEncodedBundles(t *testing.T) {
numTrials := 16
for i := 0; i < numTrials; i++ {
bundles := core.Bundles(map[core.QuorumID]core.Bundle{
0: createBundle(t, 64, 64, i),
1: createBundle(t, 64, 64, i+numTrials),
})
// ToEncodedBundles
ec, err := bundles.ToEncodedBundles()
assert.Nil(t, err)
gobChunks[i] = gobChunk

gnarkChunk, err := frame.SerializeGnark()
assert.Equal(t, len(ec), len(bundles))
for quorum, bundle := range bundles {
cd, ok := ec[quorum]
assert.True(t, ok)
fr, err := cd.ToFrames()
assert.Nil(t, err)
checkBundleEquivalence(t, fr, bundle)
}
// FromEncodedBundles
bundles2, err := new(core.Bundles).FromEncodedBundles(ec)
assert.Nil(t, err)
gnarkChunks[i] = gnarkChunk
}
gob := &core.ChunksData{
Chunks: gobChunks,
Format: core.GobChunkEncodingFormat,
ChunkLen: 64,
}
gnark := &core.ChunksData{
Chunks: gnarkChunks,
Format: core.GnarkChunkEncodingFormat,
ChunkLen: 64,
assert.Equal(t, len(bundles2), len(bundles))
for quorum, bundle := range bundles {
b, ok := bundles2[quorum]
assert.True(t, ok)
checkBundleEquivalence(t, b, bundle)
}
}
return bundle, gob, gnark
}

func TestChunksData(t *testing.T) {
Expand Down Expand Up @@ -156,6 +199,17 @@ func TestChunksData(t *testing.T) {
bytesFromBundle, err := bundle.Serialize()
assert.Nil(t, err)
assert.True(t, bytes.Equal(bytesFromChunksData, bytesFromBundle))
// FromFrames
cd, err := new(core.ChunksData).FromFrames(bundle)
assert.Nil(t, err)
checkChunksDataEquivalence(t, cd, gnark)
// ToFrames
fr1, err := gob.ToFrames()
assert.Nil(t, err)
checkBundleEquivalence(t, bundle, fr1)
fr2, err := gnark.ToFrames()
assert.Nil(t, err)
checkBundleEquivalence(t, bundle, fr2)
// Invalid cases
gnark.Chunks[0] = gnark.Chunks[0][1:]
_, err = gnark.FlattenToBundle()
Expand Down
31 changes: 24 additions & 7 deletions core/test/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ func prepareBatch(t *testing.T, operatorCount uint, blobs []core.Blob, bn uint)
blobHeaders[z] = blobHeader

encodedBlob := core.EncodedBlob{
BlobHeader: blobHeader,
BundlesByOperator: make(map[core.OperatorID]core.Bundles),
BlobHeader: blobHeader,
EncodedBundlesByOperator: make(map[core.OperatorID]core.EncodedBundles),
}
encodedBlobs[z] = encodedBlob

Expand Down Expand Up @@ -156,6 +156,14 @@ func prepareBatch(t *testing.T, operatorCount uint, blobs []core.Blob, bn uint)
if err != nil {
t.Fatal(err)
}
bytes := make([][]byte, 0, len(chunks))
for _, c := range chunks {
serialized, err := c.Serialize()
if err != nil {
t.Fatal(err)
}
bytes = append(bytes, serialized)
}

blobHeader.BlobCommitments = encoding.BlobCommitments{
Commitment: commitments.Commitment,
Expand All @@ -167,13 +175,18 @@ func prepareBatch(t *testing.T, operatorCount uint, blobs []core.Blob, bn uint)
blobHeader.QuorumInfos = append(blobHeader.QuorumInfos, quorumHeader)

for id, assignment := range assignments {
_, ok := encodedBlob.BundlesByOperator[id]
chunksData := &core.ChunksData{
Format: core.GobChunkEncodingFormat,
ChunkLen: int(chunkLength),
Chunks: bytes[assignment.StartIndex : assignment.StartIndex+assignment.NumChunks],
}
_, ok := encodedBlob.EncodedBundlesByOperator[id]
if !ok {
encodedBlob.BundlesByOperator[id] = map[core.QuorumID]core.Bundle{
quorumID: chunks[assignment.StartIndex : assignment.StartIndex+assignment.NumChunks],
encodedBlob.EncodedBundlesByOperator[id] = map[core.QuorumID]*core.ChunksData{
quorumID: chunksData,
}
} else {
encodedBlob.BundlesByOperator[id][quorumID] = chunks[assignment.StartIndex : assignment.StartIndex+assignment.NumChunks]
encodedBlob.EncodedBundlesByOperator[id][quorumID] = chunksData
}
}

Expand Down Expand Up @@ -207,9 +220,13 @@ func checkBatchByUniversalVerifier(cst core.IndexedChainState, encodedBlobs []co
val.UpdateOperatorID(id)
blobMessages := make([]*core.BlobMessage, numBlob)
for z, encodedBlob := range encodedBlobs {
bundles, err := new(core.Bundles).FromEncodedBundles(encodedBlob.EncodedBundlesByOperator[id])
if err != nil {
return err
}
blobMessages[z] = &core.BlobMessage{
BlobHeader: encodedBlob.BlobHeader,
Bundles: encodedBlob.BundlesByOperator[id],
Bundles: bundles,
}
}
err := val.ValidateBatch(&header, blobMessages, state.OperatorState, pool)
Expand Down
2 changes: 1 addition & 1 deletion disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestBatcherIterations(t *testing.T) {
assert.NoError(t, err)
count, size := components.encodingStreamer.EncodedBlobstore.GetEncodedResultSize()
assert.Equal(t, 2, count)
assert.Equal(t, uint64(24576), size) // Robert checks it
assert.Equal(t, uint64(27631), size) // Robert checks it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove Robert checks it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

txn := types.NewTransaction(0, gethcommon.Address{}, big.NewInt(0), 0, big.NewInt(0), nil)
components.transactor.On("BuildConfirmBatchTxn", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
Expand Down
7 changes: 5 additions & 2 deletions disperser/batcher/encoded_blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type EncodingResult struct {
ReferenceBlockNumber uint
BlobQuorumInfo *core.BlobQuorumInfo
Commitment *encoding.BlobCommitments
Chunks []*encoding.Frame
ChunksData *core.ChunksData
Assignments map[core.OperatorID]core.Assignment
}

Expand Down Expand Up @@ -197,5 +197,8 @@ func getRequestID(key disperser.BlobKey, quorumID core.QuorumID) requestID {

// getChunksSize returns the total size of all the chunks in the encoded result in bytes
func getChunksSize(result *EncodingResult) uint64 {
return core.Bundle(result.Chunks).Size()
if result == nil || result.ChunksData == nil {
return 0
}
return result.ChunksData.Size()
}
Loading
Loading