Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[6/N] Chunk encoding optimization: Disperser/Retriever support for new chunk encoding #650

Merged
merged 8 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion api/clients/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clients

import (
"context"
"errors"
"time"

"github.com/Layr-Labs/eigenda/api/grpc/node"
Expand Down Expand Up @@ -120,7 +121,23 @@ func (c client) GetChunks(

chunks := make([]*encoding.Frame, len(reply.GetChunks()))
for i, data := range reply.GetChunks() {
chunk, err := new(encoding.Frame).Deserialize(data)
var chunk *encoding.Frame
switch reply.GetEncoding() {
case node.ChunkEncoding_GNARK:
chunk, err = new(encoding.Frame).DeserializeGnark(data)
case node.ChunkEncoding_GOB:
chunk, err = new(encoding.Frame).Deserialize(data)
case node.ChunkEncoding_UNKNOWN:
// For backward compatibility, we fallback the UNKNOWN to GNARK
chunk, err = new(encoding.Frame).DeserializeGnark(data)
if err != nil {
chunksChan <- RetrievedChunks{
OperatorID: opID,
Err: errors.New("UNKNOWN chunk encoding format"),
Chunks: nil,
}
}
}
if err != nil {
chunksChan <- RetrievedChunks{
OperatorID: opID,
Expand Down
66 changes: 44 additions & 22 deletions disperser/batcher/grpc/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (
)

type Config struct {
Timeout time.Duration
Timeout time.Duration
EnableGnarkBundleEncoding bool
}

type dispatcher struct {
Expand Down Expand Up @@ -127,7 +128,7 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage,
ctx, cancel := context.WithTimeout(ctx, c.Timeout)
defer cancel()
start := time.Now()
request, totalSize, err := GetStoreChunksRequest(blobs, batchHeader)
request, totalSize, err := GetStoreChunksRequest(blobs, batchHeader, c.EnableGnarkBundleEncoding)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -168,7 +169,7 @@ func (c *dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.Blob
ctx, cancel := context.WithTimeout(ctx, c.Timeout)
defer cancel()
start := time.Now()
request, totalSize, err := GetStoreBlobsRequest(blobs, batchHeader)
request, totalSize, err := GetStoreBlobsRequest(blobs, batchHeader, c.EnableGnarkBundleEncoding)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -279,12 +280,12 @@ func (c *dispatcher) SendAttestBatchRequest(ctx context.Context, nodeDispersalCl
return &core.Signature{G1Point: point}, nil
}

func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader) (*node.StoreChunksRequest, int64, error) {
func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader, useGnarkBundleEncoding bool) (*node.StoreChunksRequest, int64, error) {
blobs := make([]*node.Blob, len(blobMessages))
totalSize := int64(0)
for i, blob := range blobMessages {
var err error
blobs[i], err = getBlobMessage(blob)
blobs[i], err = getBlobMessage(blob, useGnarkBundleEncoding)
if err != nil {
return nil, 0, err
}
Expand All @@ -299,12 +300,12 @@ func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.B
return request, totalSize, nil
}

func GetStoreBlobsRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader) (*node.StoreBlobsRequest, int64, error) {
func GetStoreBlobsRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader, useGnarkBundleEncoding bool) (*node.StoreBlobsRequest, int64, error) {
blobs := make([]*node.Blob, len(blobMessages))
totalSize := int64(0)
for i, blob := range blobMessages {
var err error
blobs[i], err = getBlobMessage(blob)
blobs[i], err = getBlobMessage(blob, useGnarkBundleEncoding)
if err != nil {
return nil, 0, err
}
Expand All @@ -319,7 +320,7 @@ func GetStoreBlobsRequest(blobMessages []*core.BlobMessage, batchHeader *core.Ba
return request, totalSize, nil
}

func getBlobMessage(blob *core.BlobMessage) (*node.Blob, error) {
func getBlobMessage(blob *core.BlobMessage, useGnarkBundleEncoding bool) (*node.Blob, error) {
if blob.BlobHeader == nil {
return nil, errors.New("blob header is nil")
}
Expand Down Expand Up @@ -356,22 +357,43 @@ func getBlobMessage(blob *core.BlobMessage) (*node.Blob, error) {
}
}

data, err := blob.Bundles.Serialize()
if err != nil {
return nil, err
}
bundles := make([]*node.Bundle, len(quorumHeaders))
// the ordering of quorums in bundles must be same as in quorumHeaders
for i, quorumHeader := range quorumHeaders {
quorum := quorumHeader.QuorumId
if _, ok := blob.Bundles[uint8(quorum)]; ok {
bundles[i] = &node.Bundle{
Chunks: data[quorum],
if useGnarkBundleEncoding {
// the ordering of quorums in bundles must be same as in quorumHeaders
for i, quorumHeader := range quorumHeaders {
quorum := quorumHeader.QuorumId
if bundle, ok := blob.Bundles[uint8(quorum)]; ok {
bundleBytes, err := bundle.Serialize()
if err != nil {
return nil, err
}
bundles[i] = &node.Bundle{
Bundle: bundleBytes,
}
} else {
bundles[i] = &node.Bundle{
// empty bundle for quorums operators are not part of
Bundle: make([]byte, 0),
}
}
} else {
bundles[i] = &node.Bundle{
// empty bundle for quorums operators are not part of
Chunks: make([][]byte, 0),
}
} else {
data, err := blob.Bundles.Serialize()
if err != nil {
return nil, err
}
// the ordering of quorums in bundles must be same as in quorumHeaders
for i, quorumHeader := range quorumHeaders {
quorum := quorumHeader.QuorumId
if _, ok := blob.Bundles[uint8(quorum)]; ok {
bundles[i] = &node.Bundle{
Chunks: data[quorum],
}
} else {
bundles[i] = &node.Bundle{
// empty bundle for quorums operators are not part of
Chunks: make([][]byte, 0),
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions disperser/cmd/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Config struct {
EigenDAServiceManagerAddr string

EnableMinibatch bool

EnableGnarkBundleEncoding bool
}

func NewConfig(ctx *cli.Context) (Config, error) {
Expand Down Expand Up @@ -88,6 +90,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
IndexerConfig: indexer.ReadIndexerConfig(ctx),
KMSKeyConfig: kmsConfig,
EnableMinibatch: ctx.Bool(flags.EnableMinibatchFlag.Name),
EnableGnarkBundleEncoding: ctx.Bool(flags.EnableGnarkBundleEncodingFlag.Name),
}
return config, nil
}
7 changes: 7 additions & 0 deletions disperser/cmd/batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "FINALIZATION_BLOCK_DELAY"),
Value: 75,
}
EnableGnarkBundleEncodingFlag = cli.BoolFlag{
Name: common.PrefixFlag(FlagPrefix, "enable-gnark-bundle-encoding"),
Usage: "Enable Gnark bundle encoding for chunks",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "ENABLE_GNARK_BUNDLE_ENCODING"),
}
// EnableMinibatchFlag is a flag to enable minibatch processing
// Defaults to false
EnableMinibatchFlag = cli.BoolFlag{
Expand Down Expand Up @@ -258,6 +264,7 @@ var optionalFlags = []cli.Flag{
MinibatcherPullIntervalFlag,
MaxNodeConnectionsFlag,
MaxNumRetriesPerDispersalFlag,
EnableGnarkBundleEncodingFlag,
}

// Flags contains the list of configuration options available to the binary.
Expand Down
3 changes: 2 additions & 1 deletion disperser/cmd/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ func RunBatcher(ctx *cli.Context) error {
metrics := batcher.NewMetrics(config.MetricsConfig.HTTPPort, logger)

dispatcher := dispatcher.NewDispatcher(&dispatcher.Config{
Timeout: config.TimeoutConfig.AttestationTimeout,
Timeout: config.TimeoutConfig.AttestationTimeout,
EnableGnarkBundleEncoding: config.EnableGnarkBundleEncoding,
}, logger, metrics.DispatcherMetrics)
asgn := &core.StdAssignmentCoordinator{}

Expand Down
2 changes: 1 addition & 1 deletion node/grpc/server_load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestStoreChunks(t *testing.T) {
numTotalChunks += len(blobMessagesByOp[opID][i].Bundles[0])
}
t.Logf("Batch numTotalChunks: %d", numTotalChunks)
req, totalSize, err := dispatcher.GetStoreChunksRequest(blobMessagesByOp[opID], batchHeader)
req, totalSize, err := dispatcher.GetStoreChunksRequest(blobMessagesByOp[opID], batchHeader, false)
fmt.Println("totalSize", totalSize)
assert.NoError(t, err)
assert.Equal(t, int64(26214400), totalSize)
Expand Down
Loading