diff --git a/disperser/batcher/batch_confirmer_test.go b/disperser/batcher/batch_confirmer_test.go index 449c9298db..866fae98ed 100644 --- a/disperser/batcher/batch_confirmer_test.go +++ b/disperser/batcher/batch_confirmer_test.go @@ -57,6 +57,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents { dispatcher := dmock.NewDispatcher(state) blobStore := inmem.NewBlobStore() ethClient := &cmock.MockEthClient{} + finalizer := batmock.NewFinalizer() txnManager := batmock.NewTxnManager() minibatchStore := batcherinmem.NewMinibatchStore(logger) encodingWorkerPool := workerpool.New(10) @@ -76,7 +77,7 @@ func makeBatchConfirmer(t *testing.T) *batchConfirmerComponents { MaxNumConnections: 10, MaxNumRetriesPerBlob: 2, MaxNumRetriesPerDispersal: 1, - }, blobStore, minibatchStore, dispatcher, mockChainState, assignmentCoordinator, encodingStreamer, ethClient, pool, logger, metrics, handleMinibatchLivenessChan) + }, blobStore, minibatchStore, dispatcher, mockChainState, assignmentCoordinator, encodingStreamer, ethClient, finalizer, pool, logger, metrics, handleMinibatchLivenessChan) assert.NoError(t, err) config := bat.BatchConfirmerConfig{ diff --git a/disperser/batcher/minibatcher.go b/disperser/batcher/minibatcher.go index 3bb93c0241..96b6039a47 100644 --- a/disperser/batcher/minibatcher.go +++ b/disperser/batcher/minibatcher.go @@ -50,6 +50,7 @@ type Minibatcher struct { Metrics *Metrics ethClient common.EthClient + finalizer Finalizer logger logging.Logger } @@ -62,6 +63,7 @@ func NewMinibatcher( assignmentCoordinator core.AssignmentCoordinator, encodingStreamer *EncodingStreamer, ethClient common.EthClient, + finalizer Finalizer, workerpool common.WorkerPool, logger logging.Logger, metrics *Metrics, @@ -83,6 +85,7 @@ func NewMinibatcher( MinibatchIndex: 0, ethClient: ethClient, + finalizer: finalizer, logger: logger.With("component", "Minibatcher"), HeartbeatChan: heartbeatChan, Metrics: metrics, @@ -94,9 +97,18 @@ func (b *Minibatcher) Start(ctx context.Context) error { if err != nil { return err } + // Wait for few seconds for indexer to index blockchain // This won't be needed when we switch to using Graph node time.Sleep(indexerWarmupDelay) + err = b.EncodingStreamer.Start(ctx) + if err != nil { + return err + } + batchTrigger := b.EncodingStreamer.EncodedSizeNotifier + + b.finalizer.Start(ctx) + go func() { ticker := time.NewTicker(b.PullInterval) defer ticker.Stop() @@ -120,10 +132,24 @@ func (b *Minibatcher) Start(ctx context.Context) error { if cancel != nil { cancelFuncs = append(cancelFuncs, cancel) } + case <-batchTrigger.Notify: + ticker.Stop() + + cancel, err := b.HandleSingleMinibatch(ctx) + if err != nil { + if errors.Is(err, errNoEncodedResults) { + b.logger.Warn("no encoded results to construct minibatch") + } else { + b.logger.Error("failed to process minibatch", "err", err) + } + } + if cancel != nil { + cancelFuncs = append(cancelFuncs, cancel) + } + ticker.Reset(b.PullInterval) } } }() - return nil } diff --git a/disperser/batcher/minibatcher_test.go b/disperser/batcher/minibatcher_test.go index 376f875925..62faf2d89f 100644 --- a/disperser/batcher/minibatcher_test.go +++ b/disperser/batcher/minibatcher_test.go @@ -14,6 +14,7 @@ import ( "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/disperser/batcher" "github.com/Layr-Labs/eigenda/disperser/batcher/inmem" + batchermock "github.com/Layr-Labs/eigenda/disperser/batcher/mock" dinmem "github.com/Layr-Labs/eigenda/disperser/common/inmem" dmock "github.com/Layr-Labs/eigenda/disperser/mock" "github.com/Layr-Labs/eigensdk-go/logging" @@ -83,6 +84,7 @@ func newMinibatcher(t *testing.T, config batcher.MinibatcherConfig) *minibatcher p, err := makeTestProver() assert.NoError(t, err) encoderClient := disperser.NewLocalEncoderClient(p) + finalizer := batchermock.NewFinalizer() asgn := &core.StdAssignmentCoordinator{} chainState.On("GetCurrentBlockNumber").Return(initialBlock, nil) metrics := batcher.NewMetrics("9100", logger) @@ -94,7 +96,7 @@ func newMinibatcher(t *testing.T, config batcher.MinibatcherConfig) *minibatcher assert.NoError(t, err) ethClient := &cmock.MockEthClient{} pool := workerpool.New(int(config.MaxNumConnections)) - m, err := batcher.NewMinibatcher(config, blobStore, minibatchStore, dispatcher, chainState, asgn, encodingStreamer, ethClient, pool, logger, metrics, handleMinibatchLivenessChan) + m, err := batcher.NewMinibatcher(config, blobStore, minibatchStore, dispatcher, chainState, asgn, encodingStreamer, ethClient, finalizer, pool, logger, metrics, handleMinibatchLivenessChan) assert.NoError(t, err) ics, err := chainState.GetIndexedOperatorState(context.Background(), 0, []core.QuorumID{0, 1}) assert.NoError(t, err) @@ -529,7 +531,8 @@ func TestMinibatcherTooManyPendingRequests(t *testing.T) { ctx := context.Background() mockWorkerPool := &cmock.MockWorkerpool{} // minibatcher with mock worker pool - m, err := batcher.NewMinibatcher(defaultConfig, c.blobStore, c.minibatchStore, c.dispatcher, c.minibatcher.ChainState, c.assignmentCoordinator, c.encodingStreamer, c.ethClient, mockWorkerPool, c.logger, c.metrics, handleMinibatchLivenessChan) + finalizer := batchermock.NewFinalizer() + m, err := batcher.NewMinibatcher(defaultConfig, c.blobStore, c.minibatchStore, c.dispatcher, c.minibatcher.ChainState, c.assignmentCoordinator, c.encodingStreamer, c.ethClient, finalizer, mockWorkerPool, c.logger, c.metrics, handleMinibatchLivenessChan) assert.NoError(t, err) mockWorkerPool.On("WaitingQueueSize").Return(int(defaultConfig.MaxNumConnections + 1)).Once() _, err = m.HandleSingleMinibatch(ctx) diff --git a/disperser/batcher/orchestrator.go b/disperser/batcher/orchestrator.go index 03d1fadf81..35b73c05b1 100644 --- a/disperser/batcher/orchestrator.go +++ b/disperser/batcher/orchestrator.go @@ -123,7 +123,7 @@ func NewOrchestrator( var batcher *Batcher var miniBatcher *Minibatcher if config.EnableMinibatch { - miniBatcher, err = NewMinibatcher(config.MinibatcherConfig, queue, minibatchStore, dispatcher, chainState, assignmentCoordinator, encodingStreamer, ethClient, encodingWorkerPool, logger, metrics, heartbeatChan) + miniBatcher, err = NewMinibatcher(config.MinibatcherConfig, queue, minibatchStore, dispatcher, chainState, assignmentCoordinator, encodingStreamer, ethClient, finalizer, encodingWorkerPool, logger, metrics, heartbeatChan) if err != nil { return nil, err } diff --git a/disperser/cmd/batcher/main.go b/disperser/cmd/batcher/main.go index 9c1226c3db..0150547e6b 100644 --- a/disperser/cmd/batcher/main.go +++ b/disperser/cmd/batcher/main.go @@ -249,11 +249,6 @@ func RunBatcher(ctx *cli.Context) error { minibatchStore = batchstore.NewMinibatchStore(dynamoClient, logger, config.BatcherConfig.MinibatcherConfig.BatchstoreTableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second) } - //batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics, handleBatchLivenessChan) - //if err != nil { - // return err - //} - //batcher.Start(context.Background()) orchestrator, err := batcher.NewOrchestrator(config.BatcherConfig, config.TimeoutConfig, queue, minibatchStore, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics, handleBatchLivenessChan) if err != nil { return err