diff --git a/disperser/cmd/batcher/config.go b/disperser/cmd/batcher/config.go index 50a50db6b3..7918a5e9f0 100644 --- a/disperser/cmd/batcher/config.go +++ b/disperser/cmd/batcher/config.go @@ -31,6 +31,8 @@ type Config struct { BLSOperatorStateRetrieverAddr string EigenDAServiceManagerAddr string + + EnableMinibatch bool } func NewConfig(ctx *cli.Context) (Config, error) { @@ -85,6 +87,7 @@ func NewConfig(ctx *cli.Context) (Config, error) { IndexerDataDir: ctx.GlobalString(flags.IndexerDataDirFlag.Name), IndexerConfig: indexer.ReadIndexerConfig(ctx), KMSKeyConfig: kmsConfig, + EnableMinibatch: ctx.Bool(flags.EnableMinibatchFlag.Name), } return config, nil } diff --git a/disperser/cmd/batcher/flags/flags.go b/disperser/cmd/batcher/flags/flags.go index ad3fdb91e6..6dddb76cb0 100644 --- a/disperser/cmd/batcher/flags/flags.go +++ b/disperser/cmd/batcher/flags/flags.go @@ -193,6 +193,14 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "FINALIZATION_BLOCK_DELAY"), Value: 75, } + // EnableMinibatchFlag is a flag to enable minibatch processing + // Defaults to false + EnableMinibatchFlag = cli.BoolFlag{ + Name: common.PrefixFlag(FlagPrefix, "enable-minibatch"), + Usage: "Enable minibatch processing", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "ENABLE_MINIBATCH"), + } ) var requiredFlags = []cli.Flag{ @@ -225,6 +233,7 @@ var optionalFlags = []cli.Flag{ TargetNumChunksFlag, MaxBlobsToFetchFromStoreFlag, FinalizationBlockDelayFlag, + EnableMinibatchFlag, } // Flags contains the list of configuration options available to the binary. diff --git a/disperser/cmd/batcher/main.go b/disperser/cmd/batcher/main.go index d2811bcb34..49868b4a3c 100644 --- a/disperser/cmd/batcher/main.go +++ b/disperser/cmd/batcher/main.go @@ -230,10 +230,6 @@ func RunBatcher(ctx *cli.Context) error { } finalizer := batcher.NewFinalizer(config.TimeoutConfig.ChainReadTimeout, config.BatcherConfig.FinalizerInterval, queue, client, rpcClient, config.BatcherConfig.MaxNumRetriesPerBlob, 1000, config.BatcherConfig.FinalizerPoolSize, logger, metrics.FinalizerMetrics) txnManager := batcher.NewTxnManager(client, wallet, config.EthClientConfig.NumConfirmations, 20, config.TimeoutConfig.TxnBroadcastTimeout, config.TimeoutConfig.ChainWriteTimeout, logger, metrics.TxnManagerMetrics) - batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics, handleBatchLivenessChan) - if err != nil { - return err - } // Enable Metrics Block if config.MetricsConfig.EnableMetrics { @@ -242,18 +238,25 @@ func RunBatcher(ctx *cli.Context) error { logger.Info("Enabled metrics for Batcher", "socket", httpSocket) } - err = batcher.Start(context.Background()) - if err != nil { - return err + if config.EnableMinibatch { + // TODO: implement and run batchConfirmer for minibatch + return errors.New("minibatch is not supported") + } else { + batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics, handleBatchLivenessChan) + if err != nil { + return err + } + err = batcher.Start(context.Background()) + if err != nil { + return err + } } // Signal readiness if _, err := os.Create(readinessProbePath); err != nil { log.Printf("Failed to create readiness file: %v at path %v \n", err, readinessProbePath) } - return nil - } // process liveness signal from handleBatch Go Routine