diff --git a/api/clients/node_client.go b/api/clients/node_client.go index daca5b6e53..faac36944b 100644 --- a/api/clients/node_client.go +++ b/api/clients/node_client.go @@ -2,6 +2,7 @@ package clients import ( "context" + "errors" "time" "github.com/Layr-Labs/eigenda/api/grpc/node" @@ -120,7 +121,23 @@ func (c client) GetChunks( chunks := make([]*encoding.Frame, len(reply.GetChunks())) for i, data := range reply.GetChunks() { - chunk, err := new(encoding.Frame).Deserialize(data) + var chunk *encoding.Frame + switch reply.GetEncoding() { + case node.ChunkEncoding_GNARK: + chunk, err = new(encoding.Frame).DeserializeGnark(data) + case node.ChunkEncoding_GOB: + chunk, err = new(encoding.Frame).Deserialize(data) + case node.ChunkEncoding_UNKNOWN: + // For backward compatibility, we fallback the UNKNOWN to GNARK + chunk, err = new(encoding.Frame).DeserializeGnark(data) + if err != nil { + chunksChan <- RetrievedChunks{ + OperatorID: opID, + Err: errors.New("UNKNOWN chunk encoding format"), + Chunks: nil, + } + } + } if err != nil { chunksChan <- RetrievedChunks{ OperatorID: opID, diff --git a/disperser/batcher/grpc/dispatcher.go b/disperser/batcher/grpc/dispatcher.go index 99399ae25f..ca84504ffb 100644 --- a/disperser/batcher/grpc/dispatcher.go +++ b/disperser/batcher/grpc/dispatcher.go @@ -19,7 +19,8 @@ import ( ) type Config struct { - Timeout time.Duration + Timeout time.Duration + EnableGnarkBundleEncoding bool } type dispatcher struct { @@ -127,7 +128,7 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage, ctx, cancel := context.WithTimeout(ctx, c.Timeout) defer cancel() start := time.Now() - request, totalSize, err := GetStoreChunksRequest(blobs, batchHeader) + request, totalSize, err := GetStoreChunksRequest(blobs, batchHeader, c.EnableGnarkBundleEncoding) if err != nil { return nil, err } @@ -168,7 +169,7 @@ func (c *dispatcher) SendBlobsToOperator(ctx context.Context, blobs []*core.Blob ctx, cancel := context.WithTimeout(ctx, c.Timeout) defer cancel() start := time.Now() - request, totalSize, err := GetStoreBlobsRequest(blobs, batchHeader) + request, totalSize, err := GetStoreBlobsRequest(blobs, batchHeader, c.EnableGnarkBundleEncoding) if err != nil { return nil, err } @@ -279,12 +280,12 @@ func (c *dispatcher) SendAttestBatchRequest(ctx context.Context, nodeDispersalCl return &core.Signature{G1Point: point}, nil } -func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader) (*node.StoreChunksRequest, int64, error) { +func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader, useGnarkBundleEncoding bool) (*node.StoreChunksRequest, int64, error) { blobs := make([]*node.Blob, len(blobMessages)) totalSize := int64(0) for i, blob := range blobMessages { var err error - blobs[i], err = getBlobMessage(blob) + blobs[i], err = getBlobMessage(blob, useGnarkBundleEncoding) if err != nil { return nil, 0, err } @@ -299,12 +300,12 @@ func GetStoreChunksRequest(blobMessages []*core.BlobMessage, batchHeader *core.B return request, totalSize, nil } -func GetStoreBlobsRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader) (*node.StoreBlobsRequest, int64, error) { +func GetStoreBlobsRequest(blobMessages []*core.BlobMessage, batchHeader *core.BatchHeader, useGnarkBundleEncoding bool) (*node.StoreBlobsRequest, int64, error) { blobs := make([]*node.Blob, len(blobMessages)) totalSize := int64(0) for i, blob := range blobMessages { var err error - blobs[i], err = getBlobMessage(blob) + blobs[i], err = getBlobMessage(blob, useGnarkBundleEncoding) if err != nil { return nil, 0, err } @@ -319,7 +320,7 @@ func GetStoreBlobsRequest(blobMessages []*core.BlobMessage, batchHeader *core.Ba return request, totalSize, nil } -func getBlobMessage(blob *core.BlobMessage) (*node.Blob, error) { +func getBlobMessage(blob *core.BlobMessage, useGnarkBundleEncoding bool) (*node.Blob, error) { if blob.BlobHeader == nil { return nil, errors.New("blob header is nil") } @@ -356,22 +357,43 @@ func getBlobMessage(blob *core.BlobMessage) (*node.Blob, error) { } } - data, err := blob.Bundles.Serialize() - if err != nil { - return nil, err - } bundles := make([]*node.Bundle, len(quorumHeaders)) - // the ordering of quorums in bundles must be same as in quorumHeaders - for i, quorumHeader := range quorumHeaders { - quorum := quorumHeader.QuorumId - if _, ok := blob.Bundles[uint8(quorum)]; ok { - bundles[i] = &node.Bundle{ - Chunks: data[quorum], + if useGnarkBundleEncoding { + // the ordering of quorums in bundles must be same as in quorumHeaders + for i, quorumHeader := range quorumHeaders { + quorum := quorumHeader.QuorumId + if bundle, ok := blob.Bundles[uint8(quorum)]; ok { + bundleBytes, err := bundle.Serialize() + if err != nil { + return nil, err + } + bundles[i] = &node.Bundle{ + Bundle: bundleBytes, + } + } else { + bundles[i] = &node.Bundle{ + // empty bundle for quorums operators are not part of + Bundle: make([]byte, 0), + } } - } else { - bundles[i] = &node.Bundle{ - // empty bundle for quorums operators are not part of - Chunks: make([][]byte, 0), + } + } else { + data, err := blob.Bundles.Serialize() + if err != nil { + return nil, err + } + // the ordering of quorums in bundles must be same as in quorumHeaders + for i, quorumHeader := range quorumHeaders { + quorum := quorumHeader.QuorumId + if _, ok := blob.Bundles[uint8(quorum)]; ok { + bundles[i] = &node.Bundle{ + Chunks: data[quorum], + } + } else { + bundles[i] = &node.Bundle{ + // empty bundle for quorums operators are not part of + Chunks: make([][]byte, 0), + } } } } diff --git a/disperser/cmd/batcher/config.go b/disperser/cmd/batcher/config.go index 7918a5e9f0..9ad499aa40 100644 --- a/disperser/cmd/batcher/config.go +++ b/disperser/cmd/batcher/config.go @@ -33,6 +33,8 @@ type Config struct { EigenDAServiceManagerAddr string EnableMinibatch bool + + EnableGnarkBundleEncoding bool } func NewConfig(ctx *cli.Context) (Config, error) { @@ -88,6 +90,7 @@ func NewConfig(ctx *cli.Context) (Config, error) { IndexerConfig: indexer.ReadIndexerConfig(ctx), KMSKeyConfig: kmsConfig, EnableMinibatch: ctx.Bool(flags.EnableMinibatchFlag.Name), + EnableGnarkBundleEncoding: ctx.Bool(flags.EnableGnarkBundleEncodingFlag.Name), } return config, nil } diff --git a/disperser/cmd/batcher/flags/flags.go b/disperser/cmd/batcher/flags/flags.go index 3b0cadbd5f..91c5f5ea6f 100644 --- a/disperser/cmd/batcher/flags/flags.go +++ b/disperser/cmd/batcher/flags/flags.go @@ -193,6 +193,12 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "FINALIZATION_BLOCK_DELAY"), Value: 75, } + EnableGnarkBundleEncodingFlag = cli.BoolFlag{ + Name: common.PrefixFlag(FlagPrefix, "enable-gnark-bundle-encoding"), + Usage: "Enable Gnark bundle encoding for chunks", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "ENABLE_GNARK_BUNDLE_ENCODING"), + } // EnableMinibatchFlag is a flag to enable minibatch processing // Defaults to false EnableMinibatchFlag = cli.BoolFlag{ @@ -258,6 +264,7 @@ var optionalFlags = []cli.Flag{ MinibatcherPullIntervalFlag, MaxNodeConnectionsFlag, MaxNumRetriesPerDispersalFlag, + EnableGnarkBundleEncodingFlag, } // Flags contains the list of configuration options available to the binary. diff --git a/disperser/cmd/batcher/main.go b/disperser/cmd/batcher/main.go index 49868b4a3c..3735940fe3 100644 --- a/disperser/cmd/batcher/main.go +++ b/disperser/cmd/batcher/main.go @@ -99,7 +99,8 @@ func RunBatcher(ctx *cli.Context) error { metrics := batcher.NewMetrics(config.MetricsConfig.HTTPPort, logger) dispatcher := dispatcher.NewDispatcher(&dispatcher.Config{ - Timeout: config.TimeoutConfig.AttestationTimeout, + Timeout: config.TimeoutConfig.AttestationTimeout, + EnableGnarkBundleEncoding: config.EnableGnarkBundleEncoding, }, logger, metrics.DispatcherMetrics) asgn := &core.StdAssignmentCoordinator{} diff --git a/node/grpc/server_load_test.go b/node/grpc/server_load_test.go index 88a5a8256b..a0ab6bd72d 100644 --- a/node/grpc/server_load_test.go +++ b/node/grpc/server_load_test.go @@ -103,7 +103,7 @@ func TestStoreChunks(t *testing.T) { numTotalChunks += len(blobMessagesByOp[opID][i].Bundles[0]) } t.Logf("Batch numTotalChunks: %d", numTotalChunks) - req, totalSize, err := dispatcher.GetStoreChunksRequest(blobMessagesByOp[opID], batchHeader) + req, totalSize, err := dispatcher.GetStoreChunksRequest(blobMessagesByOp[opID], batchHeader, false) fmt.Println("totalSize", totalSize) assert.NoError(t, err) assert.Equal(t, int64(26214400), totalSize)