Skip to content

Commit

Permalink
Enable minibatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
pschork committed Aug 14, 2024
1 parent 9a43392 commit 5296b08
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 16 deletions.
1 change: 1 addition & 0 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type Config struct {

TargetNumChunks uint
MaxBlobsToFetchFromStore int
EnableMinibatch bool
}

type Batcher struct {
Expand Down
4 changes: 4 additions & 0 deletions disperser/batcher/batchstore/minibatch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ const (
blobMinibatchMappingSKPrefix = "BLOB_MINIBATCH_MAPPING#"
)

type Config struct {
TableName string
}

type MinibatchStore struct {
dynamoDBClient *commondynamodb.Client
tableName string
Expand Down
38 changes: 25 additions & 13 deletions disperser/cmd/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/Layr-Labs/eigenda/common/geth"
"github.com/Layr-Labs/eigenda/core/thegraph"
"github.com/Layr-Labs/eigenda/disperser/batcher"
"github.com/Layr-Labs/eigenda/disperser/batcher/batchstore"
"github.com/Layr-Labs/eigenda/disperser/cmd/batcher/flags"
"github.com/Layr-Labs/eigenda/disperser/common/blobstore"
"github.com/Layr-Labs/eigenda/encoding/kzg"
Expand All @@ -14,18 +15,20 @@ import (
)

type Config struct {
BatcherConfig batcher.Config
TimeoutConfig batcher.TimeoutConfig
BlobstoreConfig blobstore.Config
EthClientConfig geth.EthClientConfig
AwsClientConfig aws.ClientConfig
EncoderConfig kzg.KzgConfig
LoggerConfig common.LoggerConfig
MetricsConfig batcher.MetricsConfig
IndexerConfig indexer.Config
KMSKeyConfig common.KMSKeyConfig
ChainStateConfig thegraph.Config
UseGraph bool
BatcherConfig batcher.Config
MinibatcherConfig batcher.MinibatcherConfig
TimeoutConfig batcher.TimeoutConfig
BlobstoreConfig blobstore.Config
MinibatchStoreConfig batchstore.Config
EthClientConfig geth.EthClientConfig
AwsClientConfig aws.ClientConfig
EncoderConfig kzg.KzgConfig
LoggerConfig common.LoggerConfig
MetricsConfig batcher.MetricsConfig
IndexerConfig indexer.Config
KMSKeyConfig common.KMSKeyConfig
ChainStateConfig thegraph.Config
UseGraph bool

IndexerDataDir string

Expand Down Expand Up @@ -69,6 +72,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
TargetNumChunks: ctx.GlobalUint(flags.TargetNumChunksFlag.Name),
MaxBlobsToFetchFromStore: ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name),
FinalizationBlockDelay: ctx.GlobalUint(flags.FinalizationBlockDelayFlag.Name),
EnableMinibatch: ctx.Bool(flags.EnableMinibatchFlag.Name),
},
TimeoutConfig: batcher.TimeoutConfig{
EncodingTimeout: ctx.GlobalDuration(flags.EncodingTimeoutFlag.Name),
Expand All @@ -89,8 +93,16 @@ func NewConfig(ctx *cli.Context) (Config, error) {
IndexerDataDir: ctx.GlobalString(flags.IndexerDataDirFlag.Name),
IndexerConfig: indexer.ReadIndexerConfig(ctx),
KMSKeyConfig: kmsConfig,
EnableMinibatch: ctx.Bool(flags.EnableMinibatchFlag.Name),
EnableGnarkBundleEncoding: ctx.Bool(flags.EnableGnarkBundleEncodingFlag.Name),
MinibatcherConfig: batcher.MinibatcherConfig{
PullInterval: ctx.GlobalDuration(flags.MinibatcherPullIntervalFlag.Name),
MaxNumConnections: ctx.GlobalUint(flags.MaxNodeConnectionsFlag.Name),
MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name),
MaxNumRetriesPerDispersal: ctx.GlobalUint(flags.MaxNumRetriesPerDispersalFlag.Name),
},
MinibatchStoreConfig: batchstore.Config{
TableName: ctx.GlobalString(flags.MinibatchStoreTableNameFlag.Name),
},
}
return config, nil
}
7 changes: 7 additions & 0 deletions disperser/cmd/batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_NUM_RETRIES_PER_DISPERSAL"),
Value: 3,
}
MinibatchStoreTableNameFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "minibatch-store-table-name"),
Usage: "Name of the dynamodb table to store minibatch metadata",
Required: true,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MINIBATCH_STORE_TABLE_NAME"),
}
)

var requiredFlags = []cli.Flag{
Expand Down Expand Up @@ -265,6 +271,7 @@ var optionalFlags = []cli.Flag{
MaxNodeConnectionsFlag,
MaxNumRetriesPerDispersalFlag,
EnableGnarkBundleEncodingFlag,
MinibatchStoreTableNameFlag,
}

// Flags contains the list of configuration options available to the binary.
Expand Down
34 changes: 31 additions & 3 deletions disperser/cmd/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/Layr-Labs/eigenda/core"
coreeth "github.com/Layr-Labs/eigenda/core/eth"
"github.com/Layr-Labs/eigenda/disperser/batcher"
"github.com/Layr-Labs/eigenda/disperser/batcher/batchstore"
dispatcher "github.com/Layr-Labs/eigenda/disperser/batcher/grpc"
"github.com/Layr-Labs/eigenda/disperser/cmd/batcher/flags"
"github.com/Layr-Labs/eigenda/disperser/common/blobstore"
Expand All @@ -28,6 +29,7 @@ import (
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rpc"
"github.com/gammazero/workerpool"
"github.com/urfave/cli"
)

Expand Down Expand Up @@ -239,9 +241,35 @@ func RunBatcher(ctx *cli.Context) error {
logger.Info("Enabled metrics for Batcher", "socket", httpSocket)
}

if config.EnableMinibatch {
// TODO: implement and run batchConfirmer for minibatch
return errors.New("minibatch is not supported")
if config.BatcherConfig.EnableMinibatch {
minibatchStore := batchstore.NewMinibatchStore(dynamoClient, logger, config.MinibatchStoreConfig.TableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second)
streamerConfig := batcher.StreamerConfig{
SRSOrder: config.BatcherConfig.SRSOrder,
EncodingRequestTimeout: config.BatcherConfig.PullInterval,
EncodingQueueLimit: config.BatcherConfig.EncodingRequestQueueSize,
TargetNumChunks: config.BatcherConfig.TargetNumChunks,
MaxBlobsToFetchFromStore: config.BatcherConfig.MaxBlobsToFetchFromStore,
FinalizationBlockDelay: config.BatcherConfig.FinalizationBlockDelay,
ChainStateTimeout: config.TimeoutConfig.ChainStateTimeout,
}
encodingWorkerPool := workerpool.New(config.BatcherConfig.NumConnections)
batchTrigger := batcher.NewEncodedSizeNotifier(
make(chan struct{}, 1),
uint64(config.BatcherConfig.BatchSizeMBLimit)*1024*1024, // convert to bytes
)
encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, queue, ics, encoderClient, asgn, batchTrigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger)
if err != nil {
return err
}
pool := workerpool.New(int(config.MinibatcherConfig.MaxNumConnections))
minibatcher, err := batcher.NewMinibatcher(config.MinibatcherConfig, queue, minibatchStore, dispatcher, ics, asgn, encodingStreamer, client, pool, logger)
if err != nil {
return err
}
err = minibatcher.Start(context.Background())
if err != nil {
return err
}
} else {
batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics, handleBatchLivenessChan)
if err != nil {
Expand Down

0 comments on commit 5296b08

Please sign in to comment.