Skip to content

Commit

Permalink
[5/N] Chunk encoding optimization: Add support of new encoding at Node
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix committed Jul 12, 2024
1 parent 660f0f1 commit c7b0de7
Show file tree
Hide file tree
Showing 8 changed files with 414 additions and 231 deletions.
319 changes: 160 additions & 159 deletions api/grpc/node/node.pb.go

Large diffs are not rendered by default.

18 changes: 10 additions & 8 deletions api/proto/node/node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,21 @@ message RetrieveChunksRequest {
uint32 quorum_id = 3;
}

// This describes the chunks returned in RetrieveChunksReply are encoded.
// Used to facilitate the decoding of chunks.
enum ChunkEncoding {
UNKNOWN = 0;
GNARK = 1;
GOB = 2;
}

message RetrieveChunksReply {
// All chunks the Node is storing for the requested blob per RetrieveChunksRequest.
repeated bytes chunks = 1;
// Describes how the chunks above are encoded.
enum ChunkEncoding {
UNKNOWN = 0;
GNARK = 1;
GOB = 2;
}
// How the above chunks encoded.
ChunkEncoding encoding = 2;
}


// See RetrieveChunksRequest for documentation of each parameter of GetBlobHeaderRequest.
message GetBlobHeaderRequest {
bytes batch_header_hash = 1;
Expand Down Expand Up @@ -177,5 +179,5 @@ message NodeInfoReply {
string arch = 2;
string os = 3;
uint32 num_cpu = 4;
uint64 mem_bytes = 5;
uint64 mem_bytes = 5;
}
7 changes: 4 additions & 3 deletions core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
// different IDs).
MaxQuorumID = 254

// The encoding format for bundle. Values must be in range [0, 255].
GnarkBundleEncodingFormat = 1
)

Expand Down Expand Up @@ -198,7 +199,7 @@ func (b Bundle) Serialize() ([]byte, error) {
}
result := make([]byte, size+8)
buf := result
metadata := uint64(GnarkBundleEncodingFormat) | (uint64(len(b[0].Coeffs)) << 8)
metadata := (uint64(GnarkBundleEncodingFormat) << 56) | uint64(len(b[0].Coeffs))
binary.LittleEndian.PutUint64(buf, metadata)
buf = buf[8:]
for _, f := range b {
Expand All @@ -218,10 +219,10 @@ func (b Bundle) Deserialize(data []byte) (Bundle, error) {
}
// Parse metadata
meta := binary.LittleEndian.Uint64(data)
if (meta & 0xFF) != GnarkBundleEncodingFormat {
if (meta >> 56) != GnarkBundleEncodingFormat {
return nil, errors.New("invalid bundle data encoding format")
}
chunkLen := meta >> 8
chunkLen := meta << 8 >> 8
if chunkLen == 0 {
return nil, errors.New("chunk length must be greater than zero")
}
Expand Down
4 changes: 2 additions & 2 deletions core/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,19 @@ func TestInvalidBundleDeser(t *testing.T) {
assert.EqualError(t, err, "invalid bundle data encoding format")

invliadChunkLen := make([]byte, 0, 8)
invliadChunkLen = append(invliadChunkLen, byte(1))
for i := 0; i < 7; i++ {
invliadChunkLen = append(invliadChunkLen, byte(0))
}
invliadChunkLen = append(invliadChunkLen, byte(1))
_, err = new(core.Bundle).Deserialize(invliadChunkLen)
assert.EqualError(t, err, "chunk length must be greater than zero")

data := make([]byte, 0, 9)
data = append(data, byte(1))
for i := 0; i < 6; i++ {
data = append(data, byte(0))
}
data = append(data, byte(0b00100000))
data = append(data, byte(1))
data = append(data, byte(5))
data = append(data, byte(0b01000000))
_, err = new(core.Bundle).Deserialize(data)
Expand Down
4 changes: 2 additions & 2 deletions node/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,13 @@ func (s *Server) RetrieveChunks(ctx context.Context, in *pb.RetrieveChunksReques
return nil, errors.New("request rate limited")
}

chunks, ok := s.node.Store.GetChunks(ctx, batchHeaderHash, int(in.GetBlobIndex()), uint8(in.GetQuorumId()))
chunks, format, ok := s.node.Store.GetChunks(ctx, batchHeaderHash, int(in.GetBlobIndex()), uint8(in.GetQuorumId()))
if !ok {
s.node.Metrics.RecordRPCRequest("RetrieveChunks", "failure", time.Since(start))
return nil, fmt.Errorf("could not find chunks for batchHeaderHash %v, blob index: %v, quorumID: %v", batchHeaderHash, in.GetBlobIndex(), in.GetQuorumId())
}
s.node.Metrics.RecordRPCRequest("RetrieveChunks", "success", time.Since(start))
return &pb.RetrieveChunksReply{Chunks: chunks}, nil
return &pb.RetrieveChunksReply{Chunks: chunks, Encoding: format}, nil
}

func (s *Server) GetBlobHeader(ctx context.Context, in *pb.GetBlobHeaderRequest) (*pb.GetBlobHeaderReply, error) {
Expand Down
22 changes: 17 additions & 5 deletions node/grpc/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,28 @@ func GetBlobMessages(in *pb.StoreChunksRequest, numWorkers int) ([]*core.BlobMes
}

bundles := make(map[core.QuorumID]core.Bundle, len(blob.GetBundles()))
for j, chunks := range blob.GetBundles() {
for j, bundle := range blob.GetBundles() {
quorumID := blob.GetHeader().GetQuorumHeaders()[j].GetQuorumId()
bundles[uint8(quorumID)] = make([]*encoding.Frame, len(chunks.GetChunks()))
for k, data := range chunks.GetChunks() {
chunk, err := new(encoding.Frame).Deserialize(data)
if len(bundle.GetBundle()) > 0 {
bundleMsg, err := new(core.Bundle).Deserialize(bundle.GetBundle())
if err != nil {
resultChan <- err
return
}
bundles[uint8(quorumID)][k] = chunk
bundles[uint8(quorumID)] = make([]*encoding.Frame, len(bundleMsg))
for k := 0; k < len(bundleMsg); k++ {
bundles[uint8(quorumID)][k] = bundleMsg[k]
}
} else {
bundles[uint8(quorumID)] = make([]*encoding.Frame, len(bundle.GetChunks()))
for k, data := range bundle.GetChunks() {
chunk, err := new(encoding.Frame).Deserialize(data)
if err != nil {
resultChan <- err
return
}
bundles[uint8(quorumID)][k] = chunk
}
}
}

Expand Down
138 changes: 106 additions & 32 deletions node/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (

"github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/node/leveldb"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/consensys/gnark-crypto/ecc/bn254"
"github.com/ethereum/go-ethereum/common/hexutil"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -241,12 +243,38 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs
if len(rawBlob.GetBundles()) != len(blob.Bundles) {
return nil, errors.New("internal error: the number of bundles in parsed blob must be the same as in raw blob")
}
// We expect all the bundles of the blob are either using combined bundle
// (with all chunks in a single byte array) or separate chunks, no mixed
// use.
usingBundleBytes := false
for _, bundle := range rawBlob.GetBundles() {
// If the blob is using combined bundle encoding, there must be at least
// one non-empty bundle (i.e. the node is in at least one quorum otherwise
// it shouldn't have received this blob).
if len(bundle.GetBundle()) > 0 {
usingBundleBytes = true
break
}
}
rawBundles := make(map[core.QuorumID][]byte)
rawChunks := make(map[core.QuorumID][][]byte)
for i, chunks := range rawBlob.GetBundles() {
for i, bundle := range rawBlob.GetBundles() {
quorumID := uint8(rawBlob.GetHeader().GetQuorumHeaders()[i].GetQuorumId())
rawChunks[quorumID] = make([][]byte, len(chunks.GetChunks()))
for j, chunk := range chunks.GetChunks() {
rawChunks[quorumID][j] = chunk
if usingBundleBytes {
if len(bundle.GetChunks()) > 0 && len(bundle.GetChunks()[0]) > 0 {
return nil, errors.New("chunks of a bundle are encoded together already")
}
// The bundle may be empty if the operator is not in a quorum (operators can
// be in just a subset of quorums). We need to store only the non-empty
// quorums.
if len(bundle.GetBundle()) > 0 {
rawBundles[quorumID] = bundle.GetBundle()
}
} else if len(bundle.GetChunks()) > 0 {
rawChunks[quorumID] = make([][]byte, len(bundle.GetChunks()))
for j, chunk := range bundle.GetChunks() {
rawChunks[quorumID][j] = chunk
}
}
}
serializationDuration += time.Since(start)
Expand All @@ -258,22 +286,33 @@ func (s *Store) StoreBatch(ctx context.Context, header *core.BatchHeader, blobs
log.Error("Cannot generate the key for storing blob:", "err", err)
return nil, err
}
if len(rawChunks[quorumID]) != len(bundle) {
return nil, errors.New("internal error: the number of chunks in parsed blob bundle must be the same as in raw blob bundle")
}

bundleRaw := make([][]byte, len(bundle))
for i := 0; i < len(bundle); i++ {
bundleRaw[i] = rawChunks[quorumID][i]
if usingBundleBytes {
rawBundle, ok := rawBundles[quorumID]
if ok {
size += int64(len(rawBundle))
keys = append(keys, key)
values = append(values, rawBundle)
}
} else {
if len(rawChunks[quorumID]) != len(bundle) {
return nil, errors.New("internal error: the number of chunks in parsed blob bundle must be the same as in raw blob bundle")
}
chunksBytes, ok := rawChunks[quorumID]
if ok {
bundleRaw := make([][]byte, len(bundle))
for i := 0; i < len(bundle); i++ {
bundleRaw[i] = chunksBytes[i]
}
chunkBytes, err := EncodeChunks(bundleRaw)
if err != nil {
return nil, err
}
size += int64(len(chunkBytes))
keys = append(keys, key)
values = append(values, chunkBytes)
}
}
chunkBytes, err := EncodeChunks(bundleRaw)
if err != nil {
return nil, err
}
size += int64(len(chunkBytes))

keys = append(keys, key)
values = append(values, chunkBytes)
}
encodingDuration += time.Since(start)
}
Expand Down Expand Up @@ -321,24 +360,24 @@ func (s *Store) GetBlobHeader(ctx context.Context, batchHeaderHash [32]byte, blo

// GetChunks returns the list of byte arrays stored for given blobKey along with a boolean
// indicating if the read was unsuccessful or the chunks were serialized correctly
func (s *Store) GetChunks(ctx context.Context, batchHeaderHash [32]byte, blobIndex int, quorumID core.QuorumID) ([][]byte, bool) {
func (s *Store) GetChunks(ctx context.Context, batchHeaderHash [32]byte, blobIndex int, quorumID core.QuorumID) ([][]byte, node.ChunkEncoding, bool) {
log := s.logger

blobKey, err := EncodeBlobKey(batchHeaderHash, blobIndex, quorumID)
if err != nil {
return nil, false
return nil, node.ChunkEncoding_UNKNOWN, false
}
data, err := s.db.Get(blobKey)
if err != nil {
return nil, false
return nil, node.ChunkEncoding_UNKNOWN, false
}
log.Debug("Retrieved chunk", "blobKey", hexutil.Encode(blobKey), "length", len(data))

chunks, err := DecodeChunks(data)
chunks, format, err := DecodeChunks(data)
if err != nil {
return nil, false
return nil, format, false
}
return chunks, true
return chunks, format, true
}

// HasKey returns if a given key has been stored.
Expand Down Expand Up @@ -374,11 +413,30 @@ func EncodeChunks(chunks [][]byte) ([]byte, error) {
return result, nil
}

// Converts a flattened array of chunks into an array of its constituent chunks,
// throwing an error in case the chunks were not serialized correctly
//
func DecodeGnarkChunks(data []byte) ([][]byte, error) {
meta := binary.LittleEndian.Uint64(data)
if (meta >> 56) != core.GnarkBundleEncodingFormat {
return nil, errors.New("invalid bundle data encoding format")
}
chunkLen := meta << 8 >> 8
if chunkLen == 0 {
return nil, errors.New("chunk length must be greater than zero")
}
chunkSize := bn254.SizeOfG1AffineCompressed + encoding.BYTES_PER_SYMBOL*int(chunkLen)
chunks := make([][]byte, 0)
buf := data[8:]
for len(buf) > 0 {
if len(buf) < chunkSize {
return nil, errors.New("invalid data to decode")
}
chunks = append(chunks, buf[:chunkSize])
buf = buf[chunkSize:]
}
return chunks, nil
}

// DecodeChunks((len(chunks[0]), chunks[0], len(chunks[1]), chunks[1], ...)) = chunks
func DecodeChunks(data []byte) ([][]byte, error) {
func DecodeGobChunks(data []byte) ([][]byte, error) {
chunks := make([][]byte, 0)
buf := data
for len(buf) > 0 {
Expand All @@ -391,15 +449,31 @@ func DecodeChunks(data []byte) ([][]byte, error) {
if len(buf) < int(chunkSize) {
return nil, errors.New("invalid data to decode")
}
chunk := buf[:chunkSize]
chunks = append(chunks, buf[:chunkSize])
buf = buf[chunkSize:]

chunks = append(chunks, chunk)
}

return chunks, nil
}

// Converts a flattened array of chunks into an array of its constituent chunks,
// throwing an error in case the chunks were not serialized correctly.
func DecodeChunks(data []byte) ([][]byte, node.ChunkEncoding, error) {
if len(data) < 8 {
return nil, node.ChunkEncoding_UNKNOWN, errors.New("data must have at least 8 bytes")
}
format := binary.LittleEndian.Uint64(data) >> 56
switch format {
case 0:
chunks, err := DecodeGobChunks(data)
return chunks, node.ChunkEncoding_GOB, err
case 1:
chunks, err := DecodeGnarkChunks(data)
return chunks, node.ChunkEncoding_GNARK, err
default:
return nil, node.ChunkEncoding_UNKNOWN, errors.New("invalid data encoding format")
}
}

func copyBytes(src []byte) []byte {
dst := make([]byte, len(src))
copy(dst, src)
Expand Down
Loading

0 comments on commit c7b0de7

Please sign in to comment.